diff --git a/crates/bevy_ecs/src/event.rs b/crates/bevy_ecs/src/event.rs index 66c6b361ae..b5ca2ff0e2 100644 --- a/crates/bevy_ecs/src/event.rs +++ b/crates/bevy_ecs/src/event.rs @@ -1,6 +1,7 @@ //! Event handling types. use crate as bevy_ecs; +#[cfg(feature = "multi_threaded")] use crate::batching::BatchingStrategy; use crate::change_detection::MutUntyped; use crate::{ @@ -509,6 +510,7 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> { /// assert_eq!(counter.into_inner(), 4950); /// ``` /// + #[cfg(feature = "multi_threaded")] pub fn par_read(&mut self) -> EventParIter<'_, E> { self.reader.par_read(&self.events) } @@ -722,6 +724,7 @@ impl ManualEventReader { } /// See [`EventReader::par_read`] + #[cfg(feature = "multi_threaded")] pub fn par_read<'a>(&'a mut self, events: &'a Events) -> EventParIter<'a, E> { EventParIter::new(self, events) } @@ -890,13 +893,16 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> { } /// A parallel iterator over `Event`s. +#[cfg(feature = "multi_threaded")] #[derive(Debug)] pub struct EventParIter<'a, E: Event> { reader: &'a mut ManualEventReader, slices: [&'a [EventInstance]; 2], batching_strategy: BatchingStrategy, + unread: usize, } +#[cfg(feature = "multi_threaded")] impl<'a, E: Event> EventParIter<'a, E> { /// Creates a new parallel iterator over `events` that have not yet been seen by `reader`. pub fn new(reader: &'a mut ManualEventReader, events: &'a Events) -> Self { @@ -918,6 +924,7 @@ impl<'a, E: Event> EventParIter<'a, E> { reader, slices: [a, b], batching_strategy: BatchingStrategy::default(), + unread: unread_count, } } @@ -953,7 +960,7 @@ impl<'a, E: Event> EventParIter<'a, E> { /// initialized and run from the ECS scheduler, this should never panic. /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool - pub fn for_each_with_id) + Send + Sync + Clone>(self, func: FN) { + pub fn for_each_with_id) + Send + Sync + Clone>(mut self, func: FN) { #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] { self.into_iter().for_each(|(e, i)| func(e, i)); @@ -983,6 +990,10 @@ impl<'a, E: Event> EventParIter<'a, E> { }); } }); + + // Events are guaranteed to be read at this point. + self.reader.last_event_count += self.unread; + self.unread = 0; } } @@ -997,6 +1008,7 @@ impl<'a, E: Event> EventParIter<'a, E> { } } +#[cfg(feature = "multi_threaded")] impl<'a, E: Event> IntoIterator for EventParIter<'a, E> { type IntoIter = EventIteratorWithId<'a, E>; type Item = ::Item; @@ -1572,29 +1584,34 @@ mod tests { #[cfg(feature = "multi_threaded")] #[test] fn test_events_par_iter() { - use std::{collections::HashSet, sync::mpsc}; - use crate::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Resource)] + struct Counter(AtomicUsize); let mut world = World::new(); world.init_resource::>(); - for i in 0..100 { - world.send_event(TestEvent { i }); + for _ in 0..100 { + world.send_event(TestEvent { i: 1 }); } - let mut schedule = Schedule::default(); - - schedule.add_systems(|mut events: EventReader| { - let (tx, rx) = mpsc::channel(); - events.par_read().for_each(|event| { - tx.send(event.i).unwrap(); - }); - drop(tx); - - let observed: HashSet<_> = rx.into_iter().collect(); - assert_eq!(observed, HashSet::from_iter(0..100)); - }); + schedule.add_systems( + |mut events: EventReader, counter: ResMut| { + events.par_read().for_each(|event| { + counter.0.fetch_add(event.i, Ordering::Relaxed); + }); + }, + ); + world.insert_resource(Counter(AtomicUsize::new(0))); schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!(counter.0.into_inner(), 100); + + world.insert_resource(Counter(AtomicUsize::new(0))); + schedule.run(&mut world); + let counter = world.remove_resource::().unwrap(); + assert_eq!(counter.0.into_inner(), 0); } #[test]