bevy/crates/bevy_ecs/src/query/par_iter.rs
Mark Wainwright f0a8994f55
Split WorldQuery into WorldQueryData and WorldQueryFilter (#9918)
# Objective

- Fixes #7680
- This is an updated for https://github.com/bevyengine/bevy/pull/8899
which had the same objective but fell a long way behind the latest
changes


## Solution

The traits `WorldQueryData : WorldQuery` and `WorldQueryFilter :
WorldQuery` have been added and some of the types and functions from
`WorldQuery` has been moved into them.

`ReadOnlyWorldQuery` has been replaced with `ReadOnlyWorldQueryData`. 

`WorldQueryFilter` is safe (as long as `WorldQuery` is implemented
safely).

`WorldQueryData` is unsafe - safely implementing it requires that
`Self::ReadOnly` is a readonly version of `Self` (this used to be a
safety requirement of `WorldQuery`)

The type parameters `Q` and `F` of `Query` must now implement
`WorldQueryData` and `WorldQueryFilter` respectively.

This makes it impossible to accidentally use a filter in the data
position or vice versa which was something that could lead to bugs.
~~Compile failure tests have been added to check this.~~

It was previously sometimes useful to use `Option<With<T>>` in the data
position. Use `Has<T>` instead in these cases.

The `WorldQuery` derive macro has been split into separate derive macros
for `WorldQueryData` and `WorldQueryFilter`.

Previously it was possible to derive both `WorldQuery` for a struct that
had a mixture of data and filter items. This would not work correctly in
some cases but could be a useful pattern in others. *This is no longer
possible.*

---

## Notes

- The changes outside of `bevy_ecs` are all changing type parameters to
the new types, updating the macro use, or replacing `Option<With<T>>`
with `Has<T>`.

- All `WorldQueryData` types always returned `true` for `IS_ARCHETYPAL`
so I moved it to `WorldQueryFilter` and
replaced all calls to it with `true`. That should be the only logic
change outside of the macro generation code.

- `Changed<T>` and `Added<T>` were being generated by a macro that I
have expanded. Happy to revert that if desired.

- The two derive macros share some functions for implementing
`WorldQuery` but the tidiest way I could find to implement them was to
give them a ton of arguments and ask clippy to ignore that.

## Changelog

### Changed
- Split `WorldQuery` into `WorldQueryData` and `WorldQueryFilter` which
now have separate derive macros. It is not possible to derive both for
the same type.
- `Query` now requires that the first type argument implements
`WorldQueryData` and the second implements `WorldQueryFilter`

## Migration Guide

- Update derives

```rust
// old
#[derive(WorldQuery)]
#[world_query(mutable, derive(Debug))]
struct CustomQuery {
    entity: Entity,
    a: &'static mut ComponentA
}

#[derive(WorldQuery)]
struct QueryFilter {
    _c: With<ComponentC>
}

// new 
#[derive(WorldQueryData)]
#[world_query_data(mutable, derive(Debug))]
struct CustomQuery {
    entity: Entity,
    a: &'static mut ComponentA,
}

#[derive(WorldQueryFilter)]
struct QueryFilter {
    _c: With<ComponentC>
}
```
- Replace `Option<With<T>>` with `Has<T>`

```rust
/// old
fn my_system(query: Query<(Entity, Option<With<ComponentA>>)>)
{
  for (entity, has_a_option) in query.iter(){
    let has_a:bool = has_a_option.is_some();
    //todo!()
  }
}

/// new
fn my_system(query: Query<(Entity, Has<ComponentA>)>)
{
  for (entity, has_a) in query.iter(){
    //todo!()
  }
}
```

- Fix queries which had filters in the data position or vice versa.

```rust
// old
fn my_system(query: Query<(Entity, With<ComponentA>)>)
{
  for (entity, _) in query.iter(){
  //todo!()
  }
}

// new
fn my_system(query: Query<Entity, With<ComponentA>>)
{
  for entity in query.iter(){
  //todo!()
  }
}

// old
fn my_system(query: Query<AnyOf<(&ComponentA, With<ComponentB>)>>)
{
  for (entity, _) in query.iter(){
  //todo!()
  }
}

// new
fn my_system(query: Query<Option<&ComponentA>, Or<(With<ComponentA>, With<ComponentB>)>>)
{
  for entity in query.iter(){
  //todo!()
  }
}

```

---------

Co-authored-by: Alice Cecile <alice.i.cecile@gmail.com>
2023-11-28 03:56:07 +00:00

197 lines
7.2 KiB
Rust

use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell};
use std::ops::Range;
use super::{QueryItem, QueryState, WorldQueryData, WorldQueryFilter};
/// Dictates how a parallel query chunks up large tables/archetypes
/// during iteration.
///
/// A parallel query will chunk up large tables and archetypes into
/// chunks of at most a certain batch size.
///
/// By default, this batch size is automatically determined by dividing
/// the size of the largest matched archetype by the number
/// of threads (rounded up). This attempts to minimize the overhead of scheduling
/// tasks onto multiple threads, but assumes each entity has roughly the
/// same amount of work to be done, which may not hold true in every
/// workload.
///
/// See [`Query::par_iter`] for more information.
///
/// [`Query::par_iter`]: crate::system::Query::par_iter
#[derive(Clone)]
pub struct BatchingStrategy {
/// The upper and lower limits for how large a batch of entities.
///
/// Setting the bounds to the same value will result in a fixed
/// batch size.
///
/// Defaults to `[1, usize::MAX]`.
pub batch_size_limits: Range<usize>,
/// The number of batches per thread in the [`ComputeTaskPool`].
/// Increasing this value will decrease the batch size, which may
/// increase the scheduling overhead for the iteration.
///
/// Defaults to 1.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
pub batches_per_thread: usize,
}
impl BatchingStrategy {
/// Creates a new unconstrained default batching strategy.
pub const fn new() -> Self {
Self {
batch_size_limits: 1..usize::MAX,
batches_per_thread: 1,
}
}
/// Declares a batching strategy with a fixed batch size.
pub const fn fixed(batch_size: usize) -> Self {
Self {
batch_size_limits: batch_size..batch_size,
batches_per_thread: 1,
}
}
/// Configures the minimum allowed batch size of this instance.
pub const fn min_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.start = batch_size;
self
}
/// Configures the maximum allowed batch size of this instance.
pub const fn max_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.end = batch_size;
self
}
/// Configures the number of batches to assign to each thread for this instance.
pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self {
assert!(
batches_per_thread > 0,
"The number of batches per thread must be non-zero."
);
self.batches_per_thread = batches_per_thread;
self
}
}
/// A parallel iterator over query results of a [`Query`](crate::system::Query).
///
/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
pub struct QueryParIter<'w, 's, Q: WorldQueryData, F: WorldQueryFilter> {
pub(crate) world: UnsafeWorldCell<'w>,
pub(crate) state: &'s QueryState<Q, F>,
pub(crate) last_run: Tick,
pub(crate) this_run: Tick,
pub(crate) batching_strategy: BatchingStrategy,
}
impl<'w, 's, Q: WorldQueryData, F: WorldQueryFilter> QueryParIter<'w, 's, Q, F> {
/// Changes the batching strategy used when iterating.
///
/// For more information on how this affects the resultant iteration, see
/// [`BatchingStrategy`].
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
self.batching_strategy = strategy;
self
}
/// Runs `func` on each query result in parallel.
///
/// # Panics
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(self, func: FN) {
#[cfg(any(target = "wasm32", not(feature = "multi-threaded")))]
{
// SAFETY:
// This method can only be called once per instance of QueryParIter,
// which ensures that mutable queries cannot be executed multiple times at once.
// Mutable instances of QueryParIter can only be created via an exclusive borrow of a
// Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
// at the same time.
unsafe {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
}
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
{
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
// SAFETY: See the safety comment above.
unsafe {
self.state.for_each_unchecked_manual(
self.world,
func,
self.last_run,
self.this_run,
);
}
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
// SAFETY: See the safety comment above.
unsafe {
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.last_run,
self.this_run,
);
}
}
}
}
#[cfg(all(not(target = "wasm32"), feature = "multi-threaded"))]
fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
return self.batching_strategy.batch_size_limits.start;
}
assert!(
thread_count > 0,
"Attempted to run parallel iteration over a query with an empty TaskPool"
);
let max_size = if Q::IS_DENSE && F::IS_DENSE {
// SAFETY: We only access table metadata.
let tables = unsafe { &self.world.world_metadata().storages().tables };
self.state
.matched_table_ids
.iter()
.map(|id| tables[*id].entity_count())
.max()
.unwrap_or(0)
} else {
let archetypes = &self.world.archetypes();
self.state
.matched_archetype_ids
.iter()
.map(|id| archetypes[*id].len())
.max()
.unwrap_or(0)
};
let batches = thread_count * self.batching_strategy.batches_per_thread;
// Round up to the nearest batch size.
let batch_size = (max_size + batches - 1) / batches;
batch_size.clamp(
self.batching_strategy.batch_size_limits.start,
self.batching_strategy.batch_size_limits.end,
)
}
}