diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index a29d6b96dd..d5dce1c79b 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -7,8 +7,12 @@ use crossbeam_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; use hecs::{ArchetypesGeneration, World}; use rayon::ScopeFifo; -use std::sync::{Arc, Mutex}; +use std::{ + ops::Range, + sync::{Arc, Mutex}, +}; +#[derive(Debug)] pub struct ParallelExecutor { stages: Vec, last_schedule_generation: usize, @@ -33,47 +37,38 @@ impl ParallelExecutor { } } - pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) { + pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) { let schedule_generation = schedule.generation(); - let schedule_changed = schedule_generation != self.last_schedule_generation; - + let schedule_changed = schedule.generation() != self.last_schedule_generation; if schedule_changed { self.stages.clear(); self.stages .resize_with(schedule.stage_order.len(), || ExecutorStage::default()); } - - for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() { - let executor_stage = &mut self.stages[stage_index]; - if let Some(systems) = schedule.stages.get(stage_name) { - executor_stage.prepare(world, systems, schedule_changed); - } - } - - self.last_schedule_generation = schedule_generation; - } - - pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) { - self.prepare(schedule, world); for (stage_name, executor_stage) in schedule.stage_order.iter().zip(self.stages.iter_mut()) { if let Some(stage_systems) = schedule.stages.get_mut(stage_name) { - executor_stage.run(world, resources, stage_systems); + executor_stage.run(world, resources, stage_systems, schedule_changed); } } if self.clear_trackers { world.clear_trackers(); } + + self.last_schedule_generation = schedule_generation; } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ExecutorStage { /// each system's set of dependencies system_dependencies: Vec, /// each system's dependents (the systems that can't run until this system has run) system_dependents: Vec>, + /// stores the indices of thread local systems in this stage, which are used during stage.prepare() + thread_local_system_indices: Vec, + next_thread_local_index: usize, /// the currently finished systems finished_systems: FixedBitSet, running_systems: FixedBitSet, @@ -89,6 +84,8 @@ impl Default for ExecutorStage { Self { system_dependents: Default::default(), system_dependencies: Default::default(), + thread_local_system_indices: Default::default(), + next_thread_local_index: 0, finished_systems: Default::default(), running_systems: Default::default(), sender, @@ -104,48 +101,54 @@ enum RunReadyResult { } enum RunReadyType { - All, + Range(Range), Dependents(usize), } impl ExecutorStage { - // TODO: add a start_index parameter here - pub fn prepare( + pub fn prepare_to_next_thread_local( &mut self, world: &World, - systems: &Vec>>>, + systems: &[Arc>>], schedule_changed: bool, ) { - // if the schedule has changed, clear executor state / fill it with new defaults - if schedule_changed { - self.system_dependencies.clear(); - self.system_dependencies - .resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len())); + let (prepare_system_start_index, last_thread_local_index) = + if self.next_thread_local_index == 0 { + (0, None) + } else { + // start right after the last thread local system + ( + self.thread_local_system_indices[self.next_thread_local_index - 1] + 1, + Some(self.thread_local_system_indices[self.next_thread_local_index - 1]), + ) + }; - self.system_dependents.clear(); - self.system_dependents.resize(systems.len(), Vec::new()); + let prepare_system_index_range = if let Some(index) = self + .thread_local_system_indices + .get(self.next_thread_local_index) + { + // if there is an upcoming thread local system, prepare up to (and including) it + prepare_system_start_index..(*index + 1) + } else { + // if there are no upcoming thread local systems, prepare everything right now + prepare_system_start_index..systems.len() + }; - self.finished_systems.grow(systems.len()); - self.running_systems.grow(systems.len()); - } - - let archetypes_generation = world.archetypes_generation(); let archetypes_generation_changed = - self.last_archetypes_generation != archetypes_generation; + self.last_archetypes_generation != world.archetypes_generation(); if schedule_changed || archetypes_generation_changed { // update each system's archetype access to latest world archetypes - for system in systems.iter() { - let mut system = system.lock().unwrap(); + for system_index in prepare_system_index_range.clone() { + let mut system = systems[system_index].lock().unwrap(); system.update_archetype_access(world); } // calculate dependencies between systems and build execution order let mut current_archetype_access = ArchetypeAccess::default(); let mut current_resource_access = TypeAccess::default(); - let mut last_thread_local_index: Option = None; - for (system_index, system) in systems.iter().enumerate() { - let system = system.lock().unwrap(); + for system_index in prepare_system_index_range.clone() { + let system = systems[system_index].lock().unwrap(); let archetype_access = system.archetype_access(); match system.thread_local_execution() { ThreadLocalExecution::NextFlush => { @@ -154,15 +157,16 @@ impl ExecutorStage { if current_archetype_access.is_compatible(archetype_access) == false || current_resource_access.is_compatible(resource_access) == false { - for earlier_system_index in 0..system_index { + for earlier_system_index in + prepare_system_index_range.start..system_index + { let earlier_system = systems[earlier_system_index].lock().unwrap(); - // ignore "immediate" thread local systems, we handle them separately - if let ThreadLocalExecution::Immediate = - earlier_system.thread_local_execution() - { - continue; - } + // due to how prepare ranges work, previous systems should all be "NextFlush" + debug_assert_eq!( + earlier_system.thread_local_execution(), + ThreadLocalExecution::NextFlush + ); // if earlier system is incompatible, make the current system dependent if earlier_system @@ -190,8 +194,7 @@ impl ExecutorStage { } } ThreadLocalExecution::Immediate => { - last_thread_local_index = Some(system_index); - for earlier_system_index in 0..system_index { + for earlier_system_index in prepare_system_index_range.start..system_index { // treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively self.system_dependents[earlier_system_index].push(system_index); self.system_dependencies[system_index].insert(earlier_system_index); @@ -201,7 +204,7 @@ impl ExecutorStage { } } - self.last_archetypes_generation = archetypes_generation; + self.next_thread_local_index += 1; } fn run_ready_systems<'run>( @@ -216,8 +219,8 @@ impl ExecutorStage { let mut all; let mut dependents; let system_index_iter: &mut dyn Iterator = match run_ready_type { - RunReadyType::All => { - all = 0..systems.len(); + RunReadyType::Range(range) => { + all = range; &mut all } RunReadyType::Dependents(system_index) => { @@ -272,14 +275,49 @@ impl ExecutorStage { world: &mut World, resources: &mut Resources, systems: &[Arc>>], + schedule_changed: bool, ) { + // if the schedule has changed, clear executor state / fill it with new defaults + if schedule_changed { + self.system_dependencies.clear(); + self.system_dependencies + .resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len())); + self.thread_local_system_indices = Vec::new(); + + self.system_dependents.clear(); + self.system_dependents.resize(systems.len(), Vec::new()); + + self.finished_systems.grow(systems.len()); + self.running_systems.grow(systems.len()); + + for (system_index, system) in systems.iter().enumerate() { + let system = system.lock().unwrap(); + if system.thread_local_execution() == ThreadLocalExecution::Immediate { + self.thread_local_system_indices.push(system_index); + } + } + } + + self.next_thread_local_index = 0; + self.prepare_to_next_thread_local(world, systems, schedule_changed); + self.finished_systems.clear(); self.running_systems.clear(); let mut run_ready_result = RunReadyResult::Ok; + let run_ready_system_index_range = if let Some(index) = self + .thread_local_system_indices + .get(0) + { + // if there is an upcoming thread local system, run up to (and including) it + 0..(*index + 1) + } else { + // if there are no upcoming thread local systems, run everything right now + 0..systems.len() + }; rayon::scope_fifo(|scope| { run_ready_result = - self.run_ready_systems(systems, RunReadyType::All, scope, world, resources); + self.run_ready_systems(systems, RunReadyType::Range(run_ready_system_index_range), scope, world, resources); }); loop { // if all systems in the stage are finished, break out of the loop @@ -296,7 +334,7 @@ impl ExecutorStage { self.finished_systems.insert(thread_local_index); self.sender.send(thread_local_index).unwrap(); - // TODO: if archetype generation has changed, call "prepare" on all systems after this one + self.prepare_to_next_thread_local(world, systems, schedule_changed); run_ready_result = RunReadyResult::Ok; } else { @@ -335,6 +373,8 @@ impl ExecutorStage { ThreadLocalExecution::Immediate => { /* already ran */ } } } + + self.last_archetypes_generation = world.archetypes_generation(); } } @@ -345,9 +385,10 @@ mod tests { resource::{Res, ResMut, Resources}, schedule::Schedule, system::{IntoQuerySystem, IntoThreadLocalSystem, Query}, + Commands, }; use fixedbitset::FixedBitSet; - use hecs::World; + use hecs::{Entity, World}; use std::sync::{Arc, Mutex}; #[derive(Default)] @@ -355,6 +396,63 @@ mod tests { count: Arc>, } + #[test] + fn cross_stage_archetype_change_prepare() { + let mut world = World::new(); + let mut resources = Resources::default(); + let mut schedule = Schedule::default(); + schedule.add_stage("PreArchetypeChange"); + schedule.add_stage("PostArchetypeChange"); + + fn insert(mut commands: Commands) { + commands.spawn((1u32,)); + } + + fn read(query: Query<&u32>, mut entities: Query) { + for entity in &mut entities.iter() { + // query.get() does a "system permission check" that will fail if the entity is from a + // new archetype which hasnt been "prepared yet" + query.get::(entity).unwrap(); + } + + assert_eq!(1, entities.iter().iter().count()); + } + + schedule.add_system_to_stage("PreArchetypeChange", insert.system()); + schedule.add_system_to_stage("PostArchetypeChange", read.system()); + + let mut executor = ParallelExecutor::default(); + executor.run(&mut schedule, &mut world, &mut resources); + } + + #[test] + fn intra_stage_archetype_change_prepare() { + let mut world = World::new(); + let mut resources = Resources::default(); + let mut schedule = Schedule::default(); + schedule.add_stage("update"); + + fn insert(world: &mut World, _resources: &mut Resources) { + world.spawn((1u32,)); + } + + fn read(query: Query<&u32>, mut entities: Query) { + for entity in &mut entities.iter() { + // query.get() does a "system permission check" that will fail if the entity is from a + // new archetype which hasnt been "prepared yet" + query.get::(entity).unwrap(); + } + + assert_eq!(1, entities.iter().iter().count()); + } + + schedule.add_system_to_stage("update", insert.thread_local_system()); + schedule.add_system_to_stage("update", read.system()); + + let mut executor = ParallelExecutor::default(); + executor.run(&mut schedule, &mut world, &mut resources); + } + #[test] fn schedule() { let mut world = World::new(); @@ -475,7 +573,7 @@ mod tests { world: &mut World, resources: &mut Resources, ) { - executor.prepare(schedule, world); + executor.run(schedule, world, resources); assert_eq!( executor.stages[0].system_dependents, @@ -536,8 +634,6 @@ mod tests { ] ); - executor.run(schedule, world, resources); - let counter = resources.get::().unwrap(); assert_eq!( *counter.count.lock().unwrap(), diff --git a/crates/bevy_ecs/src/system/system.rs b/crates/bevy_ecs/src/system/system.rs index ca3decedac..3b0931cf33 100644 --- a/crates/bevy_ecs/src/system/system.rs +++ b/crates/bevy_ecs/src/system/system.rs @@ -3,7 +3,7 @@ use fixedbitset::FixedBitSet; use hecs::{Access, Query, World}; use std::{any::TypeId, borrow::Cow, collections::HashSet}; -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum ThreadLocalExecution { Immediate, NextFlush,