implement par_iter_many and par_iter_many_unique (#17815)
# Objective Continuation of #16547. We do not yet have parallel versions of `par_iter_many` and `par_iter_many_unique`. It is currently very painful to try and use parallel iteration over entity lists. Even if a list is not long, each operation might still be very expensive, and worth parallelizing. Plus, it has been requested several times! ## Solution Once again, we implement what we lack! These parallel iterators collect their input entity list into a `Vec`/`UniqueEntityVec`, then chunk that over the available threads, inspired by the original `par_iter`. Since no order guarantee is given to the caller, we could sort the input list according to `EntityLocation`, but that would likely only be worth it for very large entity lists. There is some duplication which could likely be improved, but I'd like to leave that for a follow-up. ## Testing The doc tests on `for_each_init` of `QueryParManyIter` and `QueryParManyUniqueIter`.
This commit is contained in:
parent
96a4028862
commit
05e61d64f5
@ -2433,7 +2433,9 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryIterationCursor<'w, 's, D, F> {
|
||||
}
|
||||
|
||||
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
|
||||
// QueryIter, QueryIterationCursor, QuerySortedIter, QueryManyIter, QuerySortedManyIter, QueryCombinationIter, QueryState::par_fold_init_unchecked_manual
|
||||
// QueryIter, QueryIterationCursor, QuerySortedIter, QueryManyIter, QuerySortedManyIter, QueryCombinationIter,
|
||||
// QueryState::par_fold_init_unchecked_manual, QueryState::par_many_fold_init_unchecked_manual,
|
||||
// QueryState::par_many_unique_fold_init_unchecked_manual
|
||||
/// # Safety
|
||||
/// `tables` and `archetypes` must belong to the same world that the [`QueryIterationCursor`]
|
||||
/// was initialized for.
|
||||
|
@ -1,8 +1,13 @@
|
||||
use crate::{
|
||||
batching::BatchingStrategy, component::Tick, world::unsafe_world_cell::UnsafeWorldCell,
|
||||
batching::BatchingStrategy,
|
||||
component::Tick,
|
||||
entity::{EntityBorrow, TrustedEntityBorrow, UniqueEntityVec},
|
||||
world::unsafe_world_cell::UnsafeWorldCell,
|
||||
};
|
||||
|
||||
use super::{QueryData, QueryFilter, QueryItem, QueryState};
|
||||
use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
|
||||
|
||||
use alloc::vec::Vec;
|
||||
|
||||
/// A parallel iterator over query results of a [`Query`](crate::system::Query).
|
||||
///
|
||||
@ -54,7 +59,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
|
||||
/// fn system(query: Query<&T>){
|
||||
/// let mut queue: Parallel<usize> = Parallel::default();
|
||||
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
|
||||
/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue,item| {
|
||||
/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
|
||||
/// **local_queue += 1;
|
||||
/// });
|
||||
///
|
||||
@ -146,3 +151,332 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
|
||||
.calc_batch_size(max_items, thread_count)
|
||||
}
|
||||
}
|
||||
|
||||
/// A parallel iterator over the unique query items generated from an [`Entity`] list.
|
||||
///
|
||||
/// This struct is created by the [`Query::par_iter_many`] method.
|
||||
///
|
||||
/// [`Entity`]: crate::entity::Entity
|
||||
/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
|
||||
pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityBorrow> {
|
||||
pub(crate) world: UnsafeWorldCell<'w>,
|
||||
pub(crate) state: &'s QueryState<D, F>,
|
||||
pub(crate) entity_list: Vec<E>,
|
||||
pub(crate) last_run: Tick,
|
||||
pub(crate) this_run: Tick,
|
||||
pub(crate) batching_strategy: BatchingStrategy,
|
||||
}
|
||||
|
||||
impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityBorrow + Sync>
|
||||
QueryParManyIter<'w, 's, D, F, E>
|
||||
{
|
||||
/// Changes the batching strategy used when iterating.
|
||||
///
|
||||
/// For more information on how this affects the resultant iteration, see
|
||||
/// [`BatchingStrategy`].
|
||||
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
|
||||
self.batching_strategy = strategy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Runs `func` on each query result in parallel.
|
||||
///
|
||||
/// # Panics
|
||||
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[inline]
|
||||
pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
|
||||
self.for_each_init(|| {}, |_, item| func(item));
|
||||
}
|
||||
|
||||
/// Runs `func` on each query result in parallel on a value returned by `init`.
|
||||
///
|
||||
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
|
||||
/// Callers should avoid using this function as if it were a parallel version
|
||||
/// of [`Iterator::fold`].
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use bevy_utils::Parallel;
|
||||
/// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
|
||||
/// # use core::slice;
|
||||
/// use bevy_platform_support::prelude::Vec;
|
||||
/// # fn some_expensive_operation(_item: &T) -> usize {
|
||||
/// # 0
|
||||
/// # }
|
||||
///
|
||||
/// #[derive(Component)]
|
||||
/// struct T;
|
||||
///
|
||||
/// #[derive(Resource)]
|
||||
/// struct V(Vec<Entity>);
|
||||
///
|
||||
/// impl<'a> IntoIterator for &'a V {
|
||||
/// // ...
|
||||
/// # type Item = &'a Entity;
|
||||
/// # type IntoIter = slice::Iter<'a, Entity>;
|
||||
/// #
|
||||
/// # fn into_iter(self) -> Self::IntoIter {
|
||||
/// # self.0.iter()
|
||||
/// # }
|
||||
/// }
|
||||
///
|
||||
/// fn system(query: Query<&T>, entities: Res<V>){
|
||||
/// let mut queue: Parallel<usize> = Parallel::default();
|
||||
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
|
||||
/// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
|
||||
/// **local_queue += some_expensive_operation(item);
|
||||
/// });
|
||||
///
|
||||
/// // collect value from every thread
|
||||
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[inline]
|
||||
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
|
||||
where
|
||||
FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
|
||||
INIT: Fn() -> T + Sync + Send + Clone,
|
||||
{
|
||||
let func = |mut init, item| {
|
||||
func(&mut init, item);
|
||||
init
|
||||
};
|
||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
||||
{
|
||||
let init = init();
|
||||
// SAFETY:
|
||||
// This method can only be called once per instance of QueryParManyIter,
|
||||
// which ensures that mutable queries cannot be executed multiple times at once.
|
||||
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
|
||||
// Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
|
||||
// at the same time.
|
||||
unsafe {
|
||||
self.state
|
||||
.iter_many_unchecked_manual(
|
||||
&self.entity_list,
|
||||
self.world,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
)
|
||||
.fold(init, func);
|
||||
}
|
||||
}
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
{
|
||||
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
|
||||
if thread_count <= 1 {
|
||||
let init = init();
|
||||
// SAFETY: See the safety comment above.
|
||||
unsafe {
|
||||
self.state
|
||||
.iter_many_unchecked_manual(
|
||||
&self.entity_list,
|
||||
self.world,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
)
|
||||
.fold(init, func);
|
||||
}
|
||||
} else {
|
||||
// Need a batch size of at least 1.
|
||||
let batch_size = self.get_batch_size(thread_count).max(1);
|
||||
// SAFETY: See the safety comment above.
|
||||
unsafe {
|
||||
self.state.par_many_fold_init_unchecked_manual(
|
||||
init,
|
||||
self.world,
|
||||
&self.entity_list,
|
||||
batch_size,
|
||||
func,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
fn get_batch_size(&self, thread_count: usize) -> usize {
|
||||
self.batching_strategy
|
||||
.calc_batch_size(|| self.entity_list.len(), thread_count)
|
||||
}
|
||||
}
|
||||
|
||||
/// A parallel iterator over the unique query items generated from an [`EntitySet`].
|
||||
///
|
||||
/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
|
||||
///
|
||||
/// [`EntitySet`]: crate::entity::EntitySet
|
||||
/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
|
||||
/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
|
||||
pub struct QueryParManyUniqueIter<
|
||||
'w,
|
||||
's,
|
||||
D: QueryData,
|
||||
F: QueryFilter,
|
||||
E: TrustedEntityBorrow + Sync,
|
||||
> {
|
||||
pub(crate) world: UnsafeWorldCell<'w>,
|
||||
pub(crate) state: &'s QueryState<D, F>,
|
||||
pub(crate) entity_list: UniqueEntityVec<E>,
|
||||
pub(crate) last_run: Tick,
|
||||
pub(crate) this_run: Tick,
|
||||
pub(crate) batching_strategy: BatchingStrategy,
|
||||
}
|
||||
|
||||
impl<'w, 's, D: QueryData, F: QueryFilter, E: TrustedEntityBorrow + Sync>
|
||||
QueryParManyUniqueIter<'w, 's, D, F, E>
|
||||
{
|
||||
/// Changes the batching strategy used when iterating.
|
||||
///
|
||||
/// For more information on how this affects the resultant iteration, see
|
||||
/// [`BatchingStrategy`].
|
||||
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
|
||||
self.batching_strategy = strategy;
|
||||
self
|
||||
}
|
||||
|
||||
/// Runs `func` on each query result in parallel.
|
||||
///
|
||||
/// # Panics
|
||||
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[inline]
|
||||
pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
|
||||
self.for_each_init(|| {}, |_, item| func(item));
|
||||
}
|
||||
|
||||
/// Runs `func` on each query result in parallel on a value returned by `init`.
|
||||
///
|
||||
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
|
||||
/// Callers should avoid using this function as if it were a parallel version
|
||||
/// of [`Iterator::fold`].
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use bevy_utils::Parallel;
|
||||
/// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
|
||||
/// # use core::slice;
|
||||
/// # use crate::bevy_ecs::entity::UniqueEntityIter;
|
||||
/// # fn some_expensive_operation(_item: &T) -> usize {
|
||||
/// # 0
|
||||
/// # }
|
||||
///
|
||||
/// #[derive(Component)]
|
||||
/// struct T;
|
||||
///
|
||||
/// #[derive(Resource)]
|
||||
/// struct V(UniqueEntityVec<Entity>);
|
||||
///
|
||||
/// impl<'a> IntoIterator for &'a V {
|
||||
/// // ...
|
||||
/// # type Item = &'a Entity;
|
||||
/// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
|
||||
/// #
|
||||
/// # fn into_iter(self) -> Self::IntoIter {
|
||||
/// # self.0.iter()
|
||||
/// # }
|
||||
/// }
|
||||
///
|
||||
/// fn system(query: Query<&T>, entities: Res<V>){
|
||||
/// let mut queue: Parallel<usize> = Parallel::default();
|
||||
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
|
||||
/// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
|
||||
/// **local_queue += some_expensive_operation(item);
|
||||
/// });
|
||||
///
|
||||
/// // collect value from every thread
|
||||
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[inline]
|
||||
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
|
||||
where
|
||||
FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
|
||||
INIT: Fn() -> T + Sync + Send + Clone,
|
||||
{
|
||||
let func = |mut init, item| {
|
||||
func(&mut init, item);
|
||||
init
|
||||
};
|
||||
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
|
||||
{
|
||||
let init = init();
|
||||
// SAFETY:
|
||||
// This method can only be called once per instance of QueryParManyUniqueIter,
|
||||
// which ensures that mutable queries cannot be executed multiple times at once.
|
||||
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
|
||||
// Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
|
||||
// at the same time.
|
||||
unsafe {
|
||||
self.state
|
||||
.iter_many_unique_unchecked_manual(
|
||||
self.entity_list,
|
||||
self.world,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
)
|
||||
.fold(init, func);
|
||||
}
|
||||
}
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
{
|
||||
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
|
||||
if thread_count <= 1 {
|
||||
let init = init();
|
||||
// SAFETY: See the safety comment above.
|
||||
unsafe {
|
||||
self.state
|
||||
.iter_many_unique_unchecked_manual(
|
||||
self.entity_list,
|
||||
self.world,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
)
|
||||
.fold(init, func);
|
||||
}
|
||||
} else {
|
||||
// Need a batch size of at least 1.
|
||||
let batch_size = self.get_batch_size(thread_count).max(1);
|
||||
// SAFETY: See the safety comment above.
|
||||
unsafe {
|
||||
self.state.par_many_unique_fold_init_unchecked_manual(
|
||||
init,
|
||||
self.world,
|
||||
&self.entity_list,
|
||||
batch_size,
|
||||
func,
|
||||
self.last_run,
|
||||
self.this_run,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
fn get_batch_size(&self, thread_count: usize) -> usize {
|
||||
self.batching_strategy
|
||||
.calc_batch_size(|| self.entity_list.len(), thread_count)
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,9 @@ use crate::{
|
||||
world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId},
|
||||
};
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
use crate::entity::{TrustedEntityBorrow, UniqueEntitySlice};
|
||||
|
||||
use alloc::vec::Vec;
|
||||
use core::{fmt, mem::MaybeUninit, ptr};
|
||||
use fixedbitset::FixedBitSet;
|
||||
@ -23,7 +26,7 @@ use tracing::Span;
|
||||
|
||||
use super::{
|
||||
NopWorldQuery, QueryBuilder, QueryData, QueryEntityError, QueryFilter, QueryManyIter,
|
||||
QueryManyUniqueIter, QuerySingleError, ROQueryItem,
|
||||
QueryManyUniqueIter, QuerySingleError, ROQueryItem, ReadOnlyQueryData,
|
||||
};
|
||||
|
||||
/// An ID for either a table or an archetype. Used for Query iteration.
|
||||
@ -1788,7 +1791,8 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
|
||||
INIT: Fn() -> T + Sync + Send + Clone,
|
||||
{
|
||||
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
|
||||
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter,QueryState::par_fold_init_unchecked_manual
|
||||
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter,QueryState::par_fold_init_unchecked_manual,
|
||||
// QueryState::par_many_fold_init_unchecked_manual, QueryState::par_many_unique_fold_init_unchecked_manual
|
||||
use arrayvec::ArrayVec;
|
||||
|
||||
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
|
||||
@ -1868,6 +1872,128 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
|
||||
});
|
||||
}
|
||||
|
||||
/// Runs `func` on each query result in parallel for the given [`EntitySet`],
|
||||
/// where the last change and the current change tick are given. This is faster than the
|
||||
/// equivalent `iter_many_unique()` method, but cannot be chained like a normal [`Iterator`].
|
||||
///
|
||||
/// # Panics
|
||||
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
|
||||
/// have unique access to the components they query.
|
||||
/// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world`
|
||||
/// with a mismatched [`WorldId`] is unsound.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
pub(crate) unsafe fn par_many_unique_fold_init_unchecked_manual<'w, T, FN, INIT, E>(
|
||||
&self,
|
||||
init_accum: INIT,
|
||||
world: UnsafeWorldCell<'w>,
|
||||
entity_list: &UniqueEntitySlice<E>,
|
||||
batch_size: usize,
|
||||
mut func: FN,
|
||||
last_run: Tick,
|
||||
this_run: Tick,
|
||||
) where
|
||||
FN: Fn(T, D::Item<'w>) -> T + Send + Sync + Clone,
|
||||
INIT: Fn() -> T + Sync + Send + Clone,
|
||||
E: TrustedEntityBorrow + Sync,
|
||||
{
|
||||
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
|
||||
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter,QueryState::par_fold_init_unchecked_manual
|
||||
// QueryState::par_many_fold_init_unchecked_manual, QueryState::par_many_unique_fold_init_unchecked_manual
|
||||
|
||||
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
|
||||
let chunks = entity_list.chunks_exact(batch_size);
|
||||
let remainder = chunks.remainder();
|
||||
|
||||
for batch in chunks {
|
||||
let mut func = func.clone();
|
||||
let init_accum = init_accum.clone();
|
||||
scope.spawn(async move {
|
||||
#[cfg(feature = "trace")]
|
||||
let _span = self.par_iter_span.enter();
|
||||
let accum = init_accum();
|
||||
self.iter_many_unique_unchecked_manual(batch, world, last_run, this_run)
|
||||
.fold(accum, &mut func);
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(feature = "trace")]
|
||||
let _span = self.par_iter_span.enter();
|
||||
let accum = init_accum();
|
||||
self.iter_many_unique_unchecked_manual(remainder, world, last_run, this_run)
|
||||
.fold(accum, &mut func);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: ReadOnlyQueryData, F: QueryFilter> QueryState<D, F> {
|
||||
/// Runs `func` on each read-only query result in parallel for the given [`Entity`] list,
|
||||
/// where the last change and the current change tick are given. This is faster than the equivalent
|
||||
/// `iter_many()` method, but cannot be chained like a normal [`Iterator`].
|
||||
///
|
||||
/// # Panics
|
||||
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
|
||||
/// initialized and run from the ECS scheduler, this should never panic.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
|
||||
/// have unique access to the components they query.
|
||||
/// This does not validate that `world.id()` matches `self.world_id`. Calling this on a `world`
|
||||
/// with a mismatched [`WorldId`] is unsound.
|
||||
///
|
||||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
|
||||
pub(crate) unsafe fn par_many_fold_init_unchecked_manual<'w, T, FN, INIT, E>(
|
||||
&self,
|
||||
init_accum: INIT,
|
||||
world: UnsafeWorldCell<'w>,
|
||||
entity_list: &[E],
|
||||
batch_size: usize,
|
||||
mut func: FN,
|
||||
last_run: Tick,
|
||||
this_run: Tick,
|
||||
) where
|
||||
FN: Fn(T, D::Item<'w>) -> T + Send + Sync + Clone,
|
||||
INIT: Fn() -> T + Sync + Send + Clone,
|
||||
E: EntityBorrow + Sync,
|
||||
{
|
||||
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
|
||||
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::par_fold_init_unchecked_manual
|
||||
// QueryState::par_many_fold_init_unchecked_manual, QueryState::par_many_unique_fold_init_unchecked_manual
|
||||
|
||||
bevy_tasks::ComputeTaskPool::get().scope(|scope| {
|
||||
let chunks = entity_list.chunks_exact(batch_size);
|
||||
let remainder = chunks.remainder();
|
||||
|
||||
for batch in chunks {
|
||||
let mut func = func.clone();
|
||||
let init_accum = init_accum.clone();
|
||||
scope.spawn(async move {
|
||||
#[cfg(feature = "trace")]
|
||||
let _span = self.par_iter_span.enter();
|
||||
let accum = init_accum();
|
||||
self.iter_many_unchecked_manual(batch, world, last_run, this_run)
|
||||
.fold(accum, &mut func);
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(feature = "trace")]
|
||||
let _span = self.par_iter_span.enter();
|
||||
let accum = init_accum();
|
||||
self.iter_many_unchecked_manual(remainder, world, last_run, this_run)
|
||||
.fold(accum, &mut func);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
|
||||
/// Returns a single immutable query result when there is exactly one entity matching
|
||||
/// the query.
|
||||
///
|
||||
|
@ -4,8 +4,8 @@ use crate::{
|
||||
entity::{Entity, EntityBorrow, EntitySet},
|
||||
query::{
|
||||
QueryCombinationIter, QueryData, QueryEntityError, QueryFilter, QueryIter, QueryManyIter,
|
||||
QueryManyUniqueIter, QueryParIter, QuerySingleError, QueryState, ROQueryItem,
|
||||
ReadOnlyQueryData,
|
||||
QueryManyUniqueIter, QueryParIter, QueryParManyIter, QueryParManyUniqueIter,
|
||||
QuerySingleError, QueryState, ROQueryItem, ReadOnlyQueryData,
|
||||
},
|
||||
world::unsafe_world_cell::UnsafeWorldCell,
|
||||
};
|
||||
@ -1082,6 +1082,94 @@ impl<'w, 's, D: QueryData, F: QueryFilter> Query<'w, 's, D, F> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over the read-only query items generated from an [`Entity`] list.
|
||||
///
|
||||
/// Entities that don't match the query are skipped. Iteration order and thread assignment is not guaranteed.
|
||||
///
|
||||
/// If the `multithreaded` feature is disabled, iterating with this operates identically to [`Iterator::for_each`]
|
||||
/// on [`QueryManyIter`].
|
||||
///
|
||||
/// This can only be called for read-only queries. To avoid potential aliasing, there is no `par_iter_many_mut` equivalent.
|
||||
/// See [`par_iter_many_unique_mut`] for an alternative using [`EntitySet`].
|
||||
///
|
||||
/// Note that you must use the `for_each` method to iterate over the
|
||||
/// results, see [`par_iter_mut`] for an example.
|
||||
///
|
||||
/// [`par_iter_many_unique_mut`]: Self::par_iter_many_unique_mut
|
||||
/// [`par_iter_mut`]: Self::par_iter_mut
|
||||
#[inline]
|
||||
pub fn par_iter_many<EntityList: IntoIterator<Item: EntityBorrow>>(
|
||||
&self,
|
||||
entities: EntityList,
|
||||
) -> QueryParManyIter<'_, '_, D::ReadOnly, F, EntityList::Item> {
|
||||
QueryParManyIter {
|
||||
world: self.world,
|
||||
state: self.state.as_readonly(),
|
||||
entity_list: entities.into_iter().collect(),
|
||||
last_run: self.last_run,
|
||||
this_run: self.this_run,
|
||||
batching_strategy: BatchingStrategy::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over the unique read-only query items generated from an [`EntitySet`].
|
||||
///
|
||||
/// Entities that don't match the query are skipped. Iteration order and thread assignment is not guaranteed.
|
||||
///
|
||||
/// If the `multithreaded` feature is disabled, iterating with this operates identically to [`Iterator::for_each`]
|
||||
/// on [`QueryManyUniqueIter`].
|
||||
///
|
||||
/// This can only be called for read-only queries, see [`par_iter_many_unique_mut`] for write-queries.
|
||||
///
|
||||
/// Note that you must use the `for_each` method to iterate over the
|
||||
/// results, see [`par_iter_mut`] for an example.
|
||||
///
|
||||
/// [`par_iter_many_unique_mut`]: Self::par_iter_many_unique_mut
|
||||
/// [`par_iter_mut`]: Self::par_iter_mut
|
||||
#[inline]
|
||||
pub fn par_iter_many_unique<EntityList: EntitySet<Item: Sync>>(
|
||||
&self,
|
||||
entities: EntityList,
|
||||
) -> QueryParManyUniqueIter<'_, '_, D::ReadOnly, F, EntityList::Item> {
|
||||
QueryParManyUniqueIter {
|
||||
world: self.world,
|
||||
state: self.state.as_readonly(),
|
||||
entity_list: entities.into_iter().collect(),
|
||||
last_run: self.last_run,
|
||||
this_run: self.this_run,
|
||||
batching_strategy: BatchingStrategy::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over the unique query items generated from an [`EntitySet`].
|
||||
///
|
||||
/// Entities that don't match the query are skipped. Iteration order and thread assignment is not guaranteed.
|
||||
///
|
||||
/// If the `multithreaded` feature is disabled, iterating with this operates identically to [`Iterator::for_each`]
|
||||
/// on [`QueryManyUniqueIter`].
|
||||
///
|
||||
/// This can only be called for mutable queries, see [`par_iter_many_unique`] for read-only-queries.
|
||||
///
|
||||
/// Note that you must use the `for_each` method to iterate over the
|
||||
/// results, see [`par_iter_mut`] for an example.
|
||||
///
|
||||
/// [`par_iter_many_unique`]: Self::par_iter_many_unique
|
||||
/// [`par_iter_mut`]: Self::par_iter_mut
|
||||
#[inline]
|
||||
pub fn par_iter_many_unique_mut<EntityList: EntitySet<Item: Sync>>(
|
||||
&mut self,
|
||||
entities: EntityList,
|
||||
) -> QueryParManyUniqueIter<'_, '_, D, F, EntityList::Item> {
|
||||
QueryParManyUniqueIter {
|
||||
world: self.world,
|
||||
state: self.state,
|
||||
entity_list: entities.into_iter().collect(),
|
||||
last_run: self.last_run,
|
||||
this_run: self.this_run,
|
||||
batching_strategy: BatchingStrategy::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the read-only query item for the given [`Entity`].
|
||||
///
|
||||
/// In case of a nonexisting entity or mismatched component, a [`QueryEntityError`] is returned instead.
|
||||
|
Loading…
Reference in New Issue
Block a user