From 9dde99fb961edc7ffec1520e9feca0cfb4fa2a76 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 15 Apr 2024 19:37:19 -0700 Subject: [PATCH] Cleanup the multithreaded executor (#12969) # Objective Improve the code quality of the multithreaded executor. ## Solution * Remove some unused variables. * Use `Mutex::get_mut` where applicable instead of locking. * Use a `startup_systems` FixedBitset to pre-compute the starting systems instead of building it bit-by-bit on startup. * Instead of using `FixedBitset::clear` and `FixedBitset::union_with`, use `FixedBitset::clone_from` instead, which does only a single copy and will not allocate if the target bitset has a large enough allocation. * Replace the `Mutex` around `Conditions` with `SyncUnsafeCell`, and add a `Context::try_lock` that forces it to be synchronized fetched alongside the executor lock. This might produce minimal performance gains, but the focus here is on the code quality improvements. --- .../src/schedule/executor/multi_threaded.rs | 113 ++++++------------ crates/bevy_utils/src/syncunsafecell.rs | 4 +- 2 files changed, 42 insertions(+), 75 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index aab22252c7..fa9a190580 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -1,6 +1,6 @@ use std::{ any::Any, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, MutexGuard}, }; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; @@ -30,7 +30,7 @@ use super::__rust_begin_short_backtrace; struct Environment<'env, 'sys> { executor: &'env MultiThreadedExecutor, systems: &'sys [SyncUnsafeCell], - conditions: Mutex>, + conditions: SyncUnsafeCell>, world_cell: UnsafeWorldCell<'env>, } @@ -50,7 +50,7 @@ impl<'env, 'sys> Environment<'env, 'sys> { Environment { executor, systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), - conditions: Mutex::new(Conditions { + conditions: SyncUnsafeCell::new(Conditions { system_conditions: &mut schedule.system_conditions, set_conditions: &mut schedule.set_conditions, sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems, @@ -77,7 +77,6 @@ struct SystemTaskMetadata { /// The result of running a system that is sent across a channel. struct SystemResult { system_index: usize, - success: bool, } /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. @@ -90,6 +89,7 @@ pub struct MultiThreadedExecutor { apply_final_deferred: bool, /// When set, tells the executor that a thread has panicked. panic_payload: Mutex>>, + starting_systems: FixedBitSet, /// Cached tracing span #[cfg(feature = "trace")] executor_span: Span, @@ -105,12 +105,8 @@ pub struct ExecutorState { local_thread_running: bool, /// Returns `true` if an exclusive system is running. exclusive_running: bool, - /// The number of systems expected to run. - num_systems: usize, /// The number of systems that are running. num_running_systems: usize, - /// The number of systems that have completed. - num_completed_systems: usize, /// The number of dependencies each system has that have not completed. num_dependencies_remaining: Vec, /// System sets whose conditions have been evaluated. @@ -127,8 +123,6 @@ pub struct ExecutorState { completed_systems: FixedBitSet, /// Systems that have run but have not had their buffers applied. unapplied_systems: FixedBitSet, - /// When set, stops the executor from running any more systems. - stop_spawning: bool, } /// References to data required by the executor. @@ -159,6 +153,7 @@ impl SystemExecutor for MultiThreadedExecutor { let set_count = schedule.set_ids.len(); self.system_completion = ConcurrentQueue::bounded(sys_count.max(1)); + self.starting_systems = FixedBitSet::with_capacity(sys_count); state.evaluated_sets = FixedBitSet::with_capacity(set_count); state.ready_systems = FixedBitSet::with_capacity(sys_count); state.ready_systems_copy = FixedBitSet::with_capacity(sys_count); @@ -175,6 +170,9 @@ impl SystemExecutor for MultiThreadedExecutor { is_send: schedule.systems[index].is_send(), is_exclusive: schedule.systems[index].is_exclusive(), }); + if schedule.system_dependencies[index] == 0 { + self.starting_systems.insert(index); + } } state.num_dependencies_remaining = Vec::with_capacity(sys_count); @@ -188,23 +186,14 @@ impl SystemExecutor for MultiThreadedExecutor { ) { let state = self.state.get_mut().unwrap(); // reset counts - state.num_systems = schedule.systems.len(); - if state.num_systems == 0 { + if schedule.systems.is_empty() { return; } state.num_running_systems = 0; - state.num_completed_systems = 0; - state.num_dependencies_remaining.clear(); state .num_dependencies_remaining - .extend_from_slice(&schedule.system_dependencies); - - for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate() - { - if *dependencies == 0 { - state.ready_systems.insert(system_index); - } - } + .clone_from(&schedule.system_dependencies); + state.ready_systems.clone_from(&self.starting_systems); // If stepping is enabled, make sure we skip those systems that should // not be run. @@ -213,13 +202,12 @@ impl SystemExecutor for MultiThreadedExecutor { debug_assert_eq!(skipped_systems.len(), state.completed_systems.len()); // mark skipped systems as completed state.completed_systems |= skipped_systems; - state.num_completed_systems = state.completed_systems.count_ones(..); // signal the dependencies for each of the skipped systems, as // though they had run for system_index in skipped_systems.ones() { state.signal_dependents(system_index); - state.ready_systems.set(system_index, false); + state.ready_systems.remove(system_index); } } @@ -251,15 +239,14 @@ impl SystemExecutor for MultiThreadedExecutor { // Commands should be applied while on the scope's thread, not the executor's thread let res = apply_deferred(&state.unapplied_systems, systems, world); if let Err(payload) = res { - let mut panic_payload = self.panic_payload.lock().unwrap(); + let panic_payload = self.panic_payload.get_mut().unwrap(); *panic_payload = Some(payload); } state.unapplied_systems.clear(); - debug_assert!(state.unapplied_systems.is_clear()); } // check to see if there was a panic - let mut payload = self.panic_payload.lock().unwrap(); + let payload = self.panic_payload.get_mut().unwrap(); if let Some(payload) = payload.take() { std::panic::resume_unwind(payload); } @@ -288,10 +275,7 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> { self.environment .executor .system_completion - .push(SystemResult { - system_index, - success: res.is_ok(), - }) + .push(SystemResult { system_index }) .unwrap_or_else(|error| unreachable!("{}", error)); if let Err(payload) = res { eprintln!("Encountered a panic in system `{}`!", &*system.name()); @@ -304,6 +288,14 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> { self.tick_executor(); } + fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> { + let guard = self.environment.executor.state.try_lock().ok()?; + // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and + // is synchronized by the lock on the executor state. + let conditions = unsafe { &mut *self.environment.conditions.get() }; + Some((conditions, guard)) + } + fn tick_executor(&self) { // Ensure that the executor handles any events pushed to the system_completion queue by this thread. // If this thread acquires the lock, the exector runs after the push() and they are processed. @@ -311,10 +303,10 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> { // after the lock is released, which is after try_lock() failed, which is after the push() // on this thread, so the is_empty() check will see the new events and loop. loop { - let Ok(mut guard) = self.environment.executor.state.try_lock() else { + let Some((conditions, mut guard)) = self.try_lock() else { return; }; - guard.tick(self); + guard.tick(self, conditions); // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events. drop(guard); if self.environment.executor.system_completion.is_empty() { @@ -332,6 +324,7 @@ impl MultiThreadedExecutor { Self { state: Mutex::new(ExecutorState::new()), system_completion: ConcurrentQueue::unbounded(), + starting_systems: FixedBitSet::new(), apply_final_deferred: true, panic_payload: Mutex::new(None), #[cfg(feature = "trace")] @@ -344,9 +337,7 @@ impl ExecutorState { fn new() -> Self { Self { system_task_metadata: Vec::new(), - num_systems: 0, num_running_systems: 0, - num_completed_systems: 0, num_dependencies_remaining: Vec::new(), active_access: default(), local_thread_running: false, @@ -358,11 +349,10 @@ impl ExecutorState { skipped_systems: FixedBitSet::new(), completed_systems: FixedBitSet::new(), unapplied_systems: FixedBitSet::new(), - stop_spawning: false, } } - fn tick(&mut self, context: &Context) { + fn tick(&mut self, context: &Context, conditions: &mut Conditions) { #[cfg(feature = "trace")] let _span = context.environment.executor.executor_span.enter(); @@ -376,7 +366,7 @@ impl ExecutorState { // - `finish_system_and_handle_dependents` has updated the currently running systems. // - `rebuild_active_access` locks access for all currently running systems. unsafe { - self.spawn_system_tasks(context); + self.spawn_system_tasks(context, conditions); } } @@ -385,17 +375,11 @@ impl ExecutorState { /// have been mutably borrowed (such as the systems currently running). /// - `world_cell` must have permission to access all world data (not counting /// any world data that is claimed by systems currently running on this executor). - unsafe fn spawn_system_tasks(&mut self, context: &Context) { + unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) { if self.exclusive_running { return; } - let mut conditions = context - .environment - .conditions - .try_lock() - .expect("Conditions should only be locked while owning the executor state"); - // can't borrow since loop mutably borrows `self` let mut ready_systems = std::mem::take(&mut self.ready_systems_copy); @@ -405,11 +389,10 @@ impl ExecutorState { while check_for_new_ready_systems { check_for_new_ready_systems = false; - ready_systems.clear(); - ready_systems.union_with(&self.ready_systems); + ready_systems.clone_from(&self.ready_systems); for system_index in ready_systems.ones() { - assert!(!self.running_systems.contains(system_index)); + debug_assert!(!self.running_systems.contains(system_index)); // SAFETY: Caller assured that these systems are not running. // Therefore, no other reference to this system exists and there is no aliasing. let system = unsafe { &mut *context.environment.systems[system_index].get() }; @@ -417,7 +400,7 @@ impl ExecutorState { if !self.can_run( system_index, system, - &mut conditions, + conditions, context.environment.world_cell, ) { // NOTE: exclusive systems with ambiguities are susceptible to @@ -427,7 +410,7 @@ impl ExecutorState { continue; } - self.ready_systems.set(system_index, false); + self.ready_systems.remove(system_index); // SAFETY: `can_run` returned true, which means that: // - It must have called `update_archetype_component_access` for each run condition. @@ -436,7 +419,7 @@ impl ExecutorState { !self.should_run( system_index, system, - &mut conditions, + conditions, context.environment.world_cell, ) } { @@ -523,11 +506,9 @@ impl ExecutorState { return false; } - // PERF: use an optimized clear() + extend() operation - let meta_access = - &mut self.system_task_metadata[system_index].archetype_component_access; - meta_access.clear(); - meta_access.extend(system.archetype_component_access()); + self.system_task_metadata[system_index] + .archetype_component_access + .clone_from(system.archetype_component_access()); } true @@ -666,10 +647,7 @@ impl ExecutorState { } fn finish_system_and_handle_dependents(&mut self, result: SystemResult) { - let SystemResult { - system_index, - success, - } = result; + let SystemResult { system_index, .. } = result; if self.system_task_metadata[system_index].is_exclusive { self.exclusive_running = false; @@ -681,20 +659,14 @@ impl ExecutorState { debug_assert!(self.num_running_systems >= 1); self.num_running_systems -= 1; - self.num_completed_systems += 1; - self.running_systems.set(system_index, false); + self.running_systems.remove(system_index); self.completed_systems.insert(system_index); self.unapplied_systems.insert(system_index); self.signal_dependents(system_index); - - if !success { - self.stop_spawning_systems(); - } } fn skip_system_and_signal_dependents(&mut self, system_index: usize) { - self.num_completed_systems += 1; self.completed_systems.insert(system_index); self.signal_dependents(system_index); } @@ -710,13 +682,6 @@ impl ExecutorState { } } - fn stop_spawning_systems(&mut self) { - if !self.stop_spawning { - self.num_systems = self.num_completed_systems + self.num_running_systems; - self.stop_spawning = true; - } - } - fn rebuild_active_access(&mut self) { self.active_access.clear(); for index in self.running_systems.ones() { diff --git a/crates/bevy_utils/src/syncunsafecell.rs b/crates/bevy_utils/src/syncunsafecell.rs index 40a6ea2799..e447f71226 100644 --- a/crates/bevy_utils/src/syncunsafecell.rs +++ b/crates/bevy_utils/src/syncunsafecell.rs @@ -101,12 +101,14 @@ impl SyncUnsafeCell<[T]> { /// assert_eq!(slice_cell.len(), 3); /// ``` pub fn as_slice_of_cells(&self) -> &[SyncUnsafeCell] { + let self_ptr: *const SyncUnsafeCell<[T]> = ptr::from_ref(self); + let slice_ptr = self_ptr as *const [SyncUnsafeCell]; // SAFETY: `UnsafeCell` and `SyncUnsafeCell` have #[repr(transparent)] // therefore: // - `SyncUnsafeCell` has the same layout as `T` // - `SyncUnsafeCell<[T]>` has the same layout as `[T]` // - `SyncUnsafeCell<[T]>` has the same layout as `[SyncUnsafeCell]` - unsafe { &*(ptr::from_ref(self) as *const [SyncUnsafeCell]) } + unsafe { &*slice_ptr } } }