diff --git a/crates/bevy_ecs/src/batching.rs b/crates/bevy_ecs/src/batching.rs new file mode 100644 index 0000000000..5143dfb73e --- /dev/null +++ b/crates/bevy_ecs/src/batching.rs @@ -0,0 +1,108 @@ +//! Types for controlling batching behavior during parallel processing. + +use std::ops::Range; + +/// Dictates how a parallel operation chunks up large quantities +/// during iteration. +/// +/// A parallel query will chunk up large tables and archetypes into +/// chunks of at most a certain batch size. Similarly, a parallel event +/// reader will chunk up the remaining events. +/// +/// By default, this batch size is automatically determined by dividing +/// the size of the largest matched archetype by the number +/// of threads (rounded up). This attempts to minimize the overhead of scheduling +/// tasks onto multiple threads, but assumes each entity has roughly the +/// same amount of work to be done, which may not hold true in every +/// workload. +/// +/// See [`Query::par_iter`], [`EventReader::par_read`] for more information. +/// +/// [`Query::par_iter`]: crate::system::Query::par_iter +/// [`EventReader::par_read`]: crate::event::EventReader::par_read +#[derive(Clone, Debug)] +pub struct BatchingStrategy { + /// The upper and lower limits for a batch of entities. + /// + /// Setting the bounds to the same value will result in a fixed + /// batch size. + /// + /// Defaults to `[1, usize::MAX]`. + pub batch_size_limits: Range, + /// The number of batches per thread in the [`ComputeTaskPool`]. + /// Increasing this value will decrease the batch size, which may + /// increase the scheduling overhead for the iteration. + /// + /// Defaults to 1. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub batches_per_thread: usize, +} + +impl Default for BatchingStrategy { + fn default() -> Self { + Self::new() + } +} + +impl BatchingStrategy { + /// Creates a new unconstrained default batching strategy. + pub const fn new() -> Self { + Self { + batch_size_limits: 1..usize::MAX, + batches_per_thread: 1, + } + } + + /// Declares a batching strategy with a fixed batch size. + pub const fn fixed(batch_size: usize) -> Self { + Self { + batch_size_limits: batch_size..batch_size, + batches_per_thread: 1, + } + } + + /// Configures the minimum allowed batch size of this instance. + pub const fn min_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size_limits.start = batch_size; + self + } + + /// Configures the maximum allowed batch size of this instance. + pub const fn max_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size_limits.end = batch_size; + self + } + + /// Configures the number of batches to assign to each thread for this instance. + pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self { + assert!( + batches_per_thread > 0, + "The number of batches per thread must be non-zero." + ); + self.batches_per_thread = batches_per_thread; + self + } + + /// Calculate the batch size according to the given thread count and max item count. + /// The count is provided as a closure so that it can be calculated only if needed. + /// + /// # Panics + /// + /// Panics if `thread_count` is 0. + /// + #[inline] + pub fn calc_batch_size(&self, max_items: impl FnOnce() -> usize, thread_count: usize) -> usize { + if self.batch_size_limits.is_empty() { + return self.batch_size_limits.start; + } + assert!( + thread_count > 0, + "Attempted to run parallel iteration with an empty TaskPool" + ); + let batches = thread_count * self.batches_per_thread; + // Round up to the nearest batch size. + let batch_size = (max_items() + batches - 1) / batches; + batch_size.clamp(self.batch_size_limits.start, self.batch_size_limits.end) + } +} diff --git a/crates/bevy_ecs/src/event.rs b/crates/bevy_ecs/src/event.rs index 5eac04fbcf..f5c84baf6f 100644 --- a/crates/bevy_ecs/src/event.rs +++ b/crates/bevy_ecs/src/event.rs @@ -1,6 +1,7 @@ //! Event handling types. use crate as bevy_ecs; +use crate::batching::BatchingStrategy; use crate::change_detection::MutUntyped; use crate::{ change_detection::{DetectChangesMut, Mut}, @@ -30,7 +31,7 @@ pub trait Event: Send + Sync + 'static {} /// An `EventId` uniquely identifies an event stored in a specific [`World`]. /// /// An `EventId` can among other things be used to trace the flow of an event from the point it was -/// sent to the point it was processed. +/// sent to the point it was processed. `EventId`s increase montonically by send order. /// /// [`World`]: crate::world::World pub struct EventId { @@ -446,6 +447,46 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> { self.reader.read_with_id(&self.events) } + /// Returns a parallel iterator over the events this [`EventReader`] has not seen yet. + /// See also [`for_each`](EventParIter::for_each). + /// + /// # Example + /// ``` + /// # use bevy_ecs::prelude::*; + /// # use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// #[derive(Event)] + /// struct MyEvent { + /// value: usize, + /// } + /// + /// #[derive(Resource, Default)] + /// struct Counter(AtomicUsize); + /// + /// // setup + /// let mut world = World::new(); + /// world.init_resource::>(); + /// world.insert_resource(Counter::default()); + /// + /// let mut schedule = Schedule::default(); + /// schedule.add_systems(|mut events: EventReader, counter: Res| { + /// events.par_read().for_each(|MyEvent { value }| { + /// counter.0.fetch_add(*value, Ordering::Relaxed); + /// }); + /// }); + /// for value in 0..100 { + /// world.send_event(MyEvent { value }); + /// } + /// schedule.run(&mut world); + /// let Counter(counter) = world.remove_resource::().unwrap(); + /// // all events were processed + /// assert_eq!(counter.into_inner(), 4950); + /// ``` + /// + pub fn par_read(&mut self) -> EventParIter<'_, E> { + self.reader.par_read(&self.events) + } + /// Determines the number of events available to be read from this [`EventReader`] without consuming any. pub fn len(&self) -> usize { self.reader.len(&self.events) @@ -647,6 +688,11 @@ impl ManualEventReader { EventIteratorWithId::new(self, events) } + /// See [`EventReader::par_read`] + pub fn par_read<'a>(&'a mut self, events: &'a Events) -> EventParIter<'a, E> { + EventParIter::new(self, events) + } + /// See [`EventReader::len`] pub fn len(&self, events: &Events) -> usize { // The number of events in this reader is the difference between the most recent event @@ -810,6 +856,135 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> { } } +/// A parallel iterator over `Event`s. +#[derive(Debug)] +pub struct EventParIter<'a, E: Event> { + reader: &'a mut ManualEventReader, + slices: [&'a [EventInstance]; 2], + batching_strategy: BatchingStrategy, +} + +impl<'a, E: Event> EventParIter<'a, E> { + /// Creates a new parallel iterator over `events` that have not yet been seen by `reader`. + pub fn new(reader: &'a mut ManualEventReader, events: &'a Events) -> Self { + let a_index = reader + .last_event_count + .saturating_sub(events.events_a.start_event_count); + let b_index = reader + .last_event_count + .saturating_sub(events.events_b.start_event_count); + let a = events.events_a.get(a_index..).unwrap_or_default(); + let b = events.events_b.get(b_index..).unwrap_or_default(); + + let unread_count = a.len() + b.len(); + // Ensure `len` is implemented correctly + debug_assert_eq!(unread_count, reader.len(events)); + reader.last_event_count = events.event_count - unread_count; + + Self { + reader, + slices: [a, b], + batching_strategy: BatchingStrategy::default(), + } + } + + /// Changes the batching strategy used when iterating. + /// + /// For more information on how this affects the resultant iteration, see + /// [`BatchingStrategy`]. + pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self { + self.batching_strategy = strategy; + self + } + + /// Runs the provided closure for each unread event in parallel. + /// + /// Unlike normal iteration, the event order is not guaranteed in any form. + /// + /// # Panics + /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being + /// initialized and run from the ECS scheduler, this should never panic. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub fn for_each(self, func: FN) { + self.for_each_with_id(move |e, _| func(e)); + } + + /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each), + /// but additionally provides the `EventId` to the closure. + /// + /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order. + /// + /// # Panics + /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being + /// initialized and run from the ECS scheduler, this should never panic. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub fn for_each_with_id) + Send + Sync + Clone>(self, func: FN) { + #[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] + { + self.into_iter().for_each(|(e, i)| func(e, i)); + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] + { + let pool = bevy_tasks::ComputeTaskPool::get(); + let thread_count = pool.thread_num(); + if thread_count <= 1 { + return self.into_iter().for_each(|(e, i)| func(e, i)); + } + + let batch_size = self + .batching_strategy + .calc_batch_size(|| self.len(), thread_count); + let chunks = self.slices.map(|s| s.chunks_exact(batch_size)); + let remainders = chunks.each_ref().map(|c| c.remainder()); + + pool.scope(|scope| { + for batch in chunks.into_iter().flatten().chain(remainders) { + let func = func.clone(); + scope.spawn(async move { + for event in batch { + func(&event.event, event.event_id); + } + }); + } + }); + } + } + + /// Returns the number of [`Event`]s to be iterated. + pub fn len(&self) -> usize { + self.slices.iter().map(|s| s.len()).sum() + } + + /// Returns [`true`] if there are no events remaining in this iterator. + pub fn is_empty(&self) -> bool { + self.slices.iter().all(|x| x.is_empty()) + } +} + +impl<'a, E: Event> IntoIterator for EventParIter<'a, E> { + type IntoIter = EventIteratorWithId<'a, E>; + type Item = ::Item; + + fn into_iter(self) -> Self::IntoIter { + let EventParIter { + reader, + slices: [a, b], + .. + } = self; + let unread = a.len() + b.len(); + let chain = a.iter().chain(b); + EventIteratorWithId { + reader, + chain, + unread, + } + } +} + +#[doc(hidden)] struct RegisteredEvent { component_id: ComponentId, // Required to flush the secondary buffer and drop events even if left unchanged. @@ -1326,4 +1501,32 @@ mod tests { "Only sent two events; got more than two IDs" ); } + + #[cfg(feature = "multi-threaded")] + #[test] + fn test_events_par_iter() { + use std::{collections::HashSet, sync::mpsc}; + + use crate::prelude::*; + + let mut world = World::new(); + world.init_resource::>(); + for i in 0..100 { + world.send_event(TestEvent { i }); + } + + let mut schedule = Schedule::default(); + + schedule.add_systems(|mut events: EventReader| { + let (tx, rx) = mpsc::channel(); + events.par_read().for_each(|event| { + tx.send(event.i).unwrap(); + }); + drop(tx); + + let observed: HashSet<_> = rx.into_iter().collect(); + assert_eq!(observed, HashSet::from_iter(0..100)); + }); + schedule.run(&mut world); + } } diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 7eb5cf1a3b..42f580f591 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -12,6 +12,7 @@ compile_error!("bevy_ecs cannot safely compile for a 16-bit platform."); pub mod archetype; +pub mod batching; pub mod bundle; pub mod change_detection; pub mod component; diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 27bc201783..164165cf9e 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -1,89 +1,9 @@ -use crate::{component::Tick, world::unsafe_world_cell::UnsafeWorldCell}; -use std::ops::Range; +use crate::{ + batching::BatchingStrategy, component::Tick, world::unsafe_world_cell::UnsafeWorldCell, +}; use super::{QueryData, QueryFilter, QueryItem, QueryState}; -/// Dictates how a parallel query chunks up large tables/archetypes -/// during iteration. -/// -/// A parallel query will chunk up large tables and archetypes into -/// chunks of at most a certain batch size. -/// -/// By default, this batch size is automatically determined by dividing -/// the size of the largest matched archetype by the number -/// of threads (rounded up). This attempts to minimize the overhead of scheduling -/// tasks onto multiple threads, but assumes each entity has roughly the -/// same amount of work to be done, which may not hold true in every -/// workload. -/// -/// See [`Query::par_iter`] for more information. -/// -/// [`Query::par_iter`]: crate::system::Query::par_iter -#[derive(Clone)] -pub struct BatchingStrategy { - /// The upper and lower limits for how large a batch of entities. - /// - /// Setting the bounds to the same value will result in a fixed - /// batch size. - /// - /// Defaults to `[1, usize::MAX]`. - pub batch_size_limits: Range, - /// The number of batches per thread in the [`ComputeTaskPool`]. - /// Increasing this value will decrease the batch size, which may - /// increase the scheduling overhead for the iteration. - /// - /// Defaults to 1. - /// - /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool - pub batches_per_thread: usize, -} - -impl BatchingStrategy { - /// Creates a new unconstrained default batching strategy. - pub const fn new() -> Self { - Self { - batch_size_limits: 1..usize::MAX, - batches_per_thread: 1, - } - } - - /// Declares a batching strategy with a fixed batch size. - pub const fn fixed(batch_size: usize) -> Self { - Self { - batch_size_limits: batch_size..batch_size, - batches_per_thread: 1, - } - } - - /// Configures the minimum allowed batch size of this instance. - pub const fn min_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size_limits.start = batch_size; - self - } - - /// Configures the maximum allowed batch size of this instance. - pub const fn max_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size_limits.end = batch_size; - self - } - - /// Configures the number of batches to assign to each thread for this instance. - pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self { - assert!( - batches_per_thread > 0, - "The number of batches per thread must be non-zero." - ); - self.batches_per_thread = batches_per_thread; - self - } -} - -impl Default for BatchingStrategy { - fn default() -> Self { - Self::new() - } -} - /// A parallel iterator over query results of a [`Query`](crate::system::Query). /// /// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and @@ -158,37 +78,25 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] fn get_batch_size(&self, thread_count: usize) -> usize { - if self.batching_strategy.batch_size_limits.is_empty() { - return self.batching_strategy.batch_size_limits.start; - } - - assert!( - thread_count > 0, - "Attempted to run parallel iteration over a query with an empty TaskPool" - ); - let id_iter = self.state.matched_storage_ids.iter(); - let max_size = if D::IS_DENSE && F::IS_DENSE { - // SAFETY: We only access table metadata. - let tables = unsafe { &self.world.world_metadata().storages().tables }; - id_iter - // SAFETY: The if check ensures that matched_storage_ids stores TableIds - .map(|id| unsafe { tables[id.table_id].entity_count() }) - .max() - } else { - let archetypes = &self.world.archetypes(); - id_iter - // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds - .map(|id| unsafe { archetypes[id.archetype_id].len() }) - .max() + let max_items = || { + let id_iter = self.state.matched_storage_ids.iter(); + if D::IS_DENSE && F::IS_DENSE { + // SAFETY: We only access table metadata. + let tables = unsafe { &self.world.world_metadata().storages().tables }; + id_iter + // SAFETY: The if check ensures that matched_storage_ids stores TableIds + .map(|id| unsafe { tables[id.table_id].entity_count() }) + .max() + } else { + let archetypes = &self.world.archetypes(); + id_iter + // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds + .map(|id| unsafe { archetypes[id.archetype_id].len() }) + .max() + } + .unwrap_or(0) }; - let max_size = max_size.unwrap_or(0); - - let batches = thread_count * self.batching_strategy.batches_per_thread; - // Round up to the nearest batch size. - let batch_size = (max_size + batches - 1) / batches; - batch_size.clamp( - self.batching_strategy.batch_size_limits.start, - self.batching_strategy.batch_size_limits.end, - ) + self.batching_strategy + .calc_batch_size(max_items, thread_count) } } diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 9cf1abaab0..69a4e5778d 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1,11 +1,11 @@ use crate::{ archetype::{Archetype, ArchetypeComponentId, ArchetypeGeneration, ArchetypeId}, + batching::BatchingStrategy, component::{ComponentId, Tick}, entity::Entity, prelude::FromWorld, query::{ - Access, BatchingStrategy, DebugCheckedUnwrap, FilteredAccess, QueryCombinationIter, - QueryIter, QueryParIter, + Access, DebugCheckedUnwrap, FilteredAccess, QueryCombinationIter, QueryIter, QueryParIter, }, storage::{SparseSetIndex, TableId}, world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId}, diff --git a/crates/bevy_ecs/src/system/query.rs b/crates/bevy_ecs/src/system/query.rs index 12cb32204f..de4fb95e14 100644 --- a/crates/bevy_ecs/src/system/query.rs +++ b/crates/bevy_ecs/src/system/query.rs @@ -1,10 +1,10 @@ use crate::{ + batching::BatchingStrategy, component::Tick, entity::Entity, query::{ - BatchingStrategy, QueryCombinationIter, QueryData, QueryEntityError, QueryFilter, - QueryIter, QueryManyIter, QueryParIter, QuerySingleError, QueryState, ROQueryItem, - ReadOnlyQueryData, + QueryCombinationIter, QueryData, QueryEntityError, QueryFilter, QueryIter, QueryManyIter, + QueryParIter, QuerySingleError, QueryState, ROQueryItem, ReadOnlyQueryData, }, world::unsafe_world_cell::UnsafeWorldCell, }; diff --git a/examples/ecs/parallel_query.rs b/examples/ecs/parallel_query.rs index ece2c5d782..8eebb26da1 100644 --- a/examples/ecs/parallel_query.rs +++ b/examples/ecs/parallel_query.rs @@ -1,6 +1,6 @@ //! Illustrates parallel queries with `ParallelIterator`. -use bevy::ecs::query::BatchingStrategy; +use bevy::ecs::batching::BatchingStrategy; use bevy::prelude::*; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha8Rng;