diff --git a/CREDITS.md b/CREDITS.md index 8353b9cb40..d2ac81cd30 100644 --- a/CREDITS.md +++ b/CREDITS.md @@ -5,6 +5,7 @@ * hecs * legion_transform * wgpu-rs examples +* yaks: ArchetypeSet, borrowed some ideas from their scheduler implementation ## Inspiration diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index cacd4672ff..04bfcf7641 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -12,4 +12,6 @@ profiler = [] [dependencies] hecs = { path = "hecs", features = ["macros", "serialize"] } rand = "0.7.2" -rayon = "1.3" \ No newline at end of file +rayon = "1.3" +crossbeam-channel = "0.4.2" +fixedbitset = "0.3.0" \ No newline at end of file diff --git a/crates/bevy_ecs/src/executor.rs b/crates/bevy_ecs/src/executor.rs new file mode 100644 index 0000000000..637a867254 --- /dev/null +++ b/crates/bevy_ecs/src/executor.rs @@ -0,0 +1,369 @@ +use crate::{system::ThreadLocalExecution, Resources, Schedule, System}; +use crossbeam_channel::{Receiver, Sender}; +use fixedbitset::FixedBitSet; +use hecs::{Access, Query, World}; +use rayon::ScopeFifo; +use std::sync::{Arc, Mutex}; + +#[derive(Default)] +pub struct Executor { + stages: Vec, +} + +impl Executor { + pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) { + // TODO: if world archetype generation hasnt changed, dont update here + + let mut executor_stages = vec![ExecutorStage::default(); schedule.stage_order.len()]; + for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() { + let executor_stage = &mut executor_stages[stage_index]; + // ensure finished dependencies has the required number of bits + if let Some(systems) = schedule.stages.get(stage_name) { + executor_stage.prepare(world, systems); + } + } + + self.stages = executor_stages; + } + + pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) { + for (stage_name, executor_stage) in schedule.stage_order.iter().zip(self.stages.iter_mut()) + { + if let Some(stage_systems) = schedule.stages.get_mut(stage_name) { + executor_stage.run(world, resources, stage_systems); + } + } + } +} + +#[derive(Clone)] +pub struct ExecutorStage { + /// each system's set of dependencies + system_dependencies: Vec, + /// each system's dependents (the systems that can't run until this system has run) + system_dependents: Vec>, + + /// the currently finished systems + finished_systems: FixedBitSet, + running_systems: FixedBitSet, + + sender: Sender, + receiver: Receiver, +} + +impl Default for ExecutorStage { + fn default() -> Self { + let (sender, receiver) = crossbeam_channel::unbounded(); + Self { + system_dependents: Default::default(), + system_dependencies: Default::default(), + finished_systems: Default::default(), + running_systems: Default::default(), + sender, + receiver, + } + } +} + +impl ExecutorStage { + pub fn prepare(&mut self, world: &World, systems: &Vec>>>) { + self.system_dependencies = vec![FixedBitSet::with_capacity(systems.len()); systems.len()]; + self.system_dependents = vec![Vec::new(); systems.len()]; + self.finished_systems.grow(systems.len()); + self.running_systems.grow(systems.len()); + + // update each system's archetype access to latest world archetypes + for system in systems.iter() { + let mut system = system.lock().unwrap(); + system.update_archetype_access(world); + } + + let mut current_archetype_access = ArchetypeAccess::default(); + for (system_index, system) in systems.iter().enumerate() { + let system = system.lock().unwrap(); + if let Some(archetype_access) = system.get_archetype_access() { + // TODO: check if thread local and add full sync + // if any system before this one conflicts, check all systems that came before for compatibility + if current_archetype_access.is_compatible(archetype_access) == false { + for earlier_system_index in 0..system_index { + let earlier_system = systems[earlier_system_index].lock().unwrap(); + if let Some(earlier_archetype_access) = + earlier_system.get_archetype_access() + { + // if earlier system is incompatible, make the current system dependent + if earlier_archetype_access.is_compatible(archetype_access) == false { + self.system_dependents[earlier_system_index].push(system_index); + self.system_dependencies[system_index].insert(earlier_system_index); + } + } + } + } + + current_archetype_access.union(archetype_access); + } + } + } + + pub fn run_ready_systems<'run>( + &mut self, + systems: &[Arc>>], + scope: &ScopeFifo<'run>, + world: &'run World, + resources: &'run Resources, + ) { + for i in 0..systems.len() { + Self::try_run_system( + systems, + i, + &mut self.running_systems, + &self.finished_systems, + &self.system_dependencies, + &self.sender, + scope, + world, + resources, + ); + } + } + + #[inline] + pub fn try_run_system<'run>( + systems: &[Arc>>], + system_index: usize, + running_systems: &mut FixedBitSet, + finished_systems: &FixedBitSet, + system_dependencies: &[FixedBitSet], + sender: &Sender, + scope: &ScopeFifo<'run>, + world: &'run World, + resources: &'run Resources, + ) { + if running_systems.contains(system_index) { + return; + } + + // if all system dependencies are finished, queue up the system to run + if system_dependencies[system_index].is_subset(&finished_systems) { + let system = systems[system_index].clone(); + let sender = sender.clone(); + running_systems.insert(system_index); + scope.spawn_fifo(move |_| { + let mut system = system.lock().unwrap(); + system.run(world, resources); + sender.send(system_index).unwrap(); + }) + } + } + + pub fn run( + &mut self, + world: &mut World, + resources: &mut Resources, + systems: &[Arc>>], + ) { + self.finished_systems.clear(); + self.running_systems.clear(); + { + let world = &*world; + let resources = &*resources; + + rayon::scope_fifo(move |scope| { + self.run_ready_systems(systems, scope, world, resources); + loop { + if self.finished_systems.count_ones(..) == systems.len() { + break; + } + + let finished_system = self.receiver.recv().unwrap(); + self.finished_systems.insert(finished_system); + for dependent_system in self.system_dependents[finished_system].iter() { + Self::try_run_system( + systems, + *dependent_system, + &mut self.running_systems, + &self.finished_systems, + &self.system_dependencies, + &self.sender, + scope, + world, + resources, + ); + } + } + }); + } + + // "flush" + // NOTE: when this is made parallel a full sync is required here + for system in systems.iter() { + let mut system = system.lock().unwrap(); + match system.thread_local_execution() { + ThreadLocalExecution::NextFlush => system.run_thread_local(world, resources), + ThreadLocalExecution::Immediate => { + // TODO: this should happen immediately after thread local systems + system.run_thread_local(world, resources) + } + } + } + } +} + +// credit to Ratysz from the Yaks codebase +#[derive(Default)] +pub struct ArchetypeAccess { + pub immutable: FixedBitSet, + pub mutable: FixedBitSet, +} + +impl ArchetypeAccess { + pub fn is_compatible(&self, other: &ArchetypeAccess) -> bool { + self.mutable.is_disjoint(&other.mutable) + && self.mutable.is_disjoint(&other.immutable) + && self.immutable.is_disjoint(&other.mutable) + } + + pub fn union(&mut self, other: &ArchetypeAccess) { + self.mutable.union_with(&other.mutable); + self.immutable.union_with(&other.immutable); + } + + pub fn set_bits_for_query(&mut self, world: &World) + where + Q: Query, + { + self.immutable.clear(); + self.mutable.clear(); + let iterator = world.archetypes(); + let bits = iterator.len(); + self.immutable.grow(bits); + self.mutable.grow(bits); + iterator + .enumerate() + .filter_map(|(index, archetype)| archetype.access::().map(|access| (index, access))) + .for_each(|(archetype, access)| match access { + Access::Read => self.immutable.set(archetype, true), + Access::Write => self.mutable.set(archetype, true), + Access::Iterate => (), + }); + } +} + +#[cfg(test)] +mod tests { + use super::Executor; + use crate::{IntoQuerySystem, Query, Res, Resources, Schedule, World}; + use fixedbitset::FixedBitSet; + use std::sync::{Arc, Mutex}; + + #[derive(Default)] + struct Counter { + count: Arc>, + } + + fn read_u32(counter: Res, _query: Query<&u32>) { + let mut count = counter.count.lock().unwrap(); + assert!( + *count < 2, + "read_32 should run be one of the first two systems to run" + ); + *count += 1; + } + + fn write_float(counter: Res, _query: Query<&f32>) { + let mut count = counter.count.lock().unwrap(); + assert!( + *count < 2, + "write_float should run be one of the first two systems to run" + ); + *count += 1; + } + + fn read_u32_write_u64(counter: Res, _query: Query<(&u32, &mut u64)>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 2, + "read_u32_write_u64 should always be the third system to run" + ); + *count += 1; + } + + fn read_u64(counter: Res, _query: Query<&u64>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 3, + "read_u64 should always be the fourth system to run" + ); + *count += 1; + } + + fn write_u64(counter: Res, _query: Query<&mut u64>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 4, + "write_u64 should always be the fifth system to run" + ); + *count += 1; + } + + #[test] + fn schedule() { + let mut world = World::new(); + let mut resources = Resources::default(); + resources.insert(Counter::default()); + + world.spawn((1.0f32,)); + world.spawn((1u32, 1u64)); + world.spawn((2u32,)); + + let mut schedule = Schedule::default(); + schedule.add_stage("A"); + schedule.add_stage("B"); + + schedule.add_system_to_stage("A", read_u32.system()); + schedule.add_system_to_stage("A", write_float.system()); + schedule.add_system_to_stage("A", read_u32_write_u64.system()); + schedule.add_system_to_stage("A", read_u64.system()); + schedule.add_system_to_stage("B", write_u64.system()); + + let mut executor = Executor::default(); + executor.prepare(&mut schedule, &world); + + assert_eq!( + executor.stages[0].system_dependents, + vec![vec![2], vec![], vec![3], vec![]] + ); + assert_eq!(executor.stages[1].system_dependents, vec![vec![]]); + + let stage_0_len = executor.stages[0].system_dependencies.len(); + let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len); + read_u32_write_u64_deps.insert(0); + let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len); + read_u64_deps.insert(2); + + assert_eq!( + executor.stages[0].system_dependencies, + vec![ + FixedBitSet::with_capacity(stage_0_len), + FixedBitSet::with_capacity(stage_0_len), + read_u32_write_u64_deps, + read_u64_deps, + ] + ); + + assert_eq!( + executor.stages[1].system_dependencies, + vec![ + FixedBitSet::with_capacity(1), + ] + ); + + executor.run(&mut schedule, &mut world, &mut resources); + + let counter = resources.get::().unwrap(); + assert_eq!( + *counter.count.lock().unwrap(), + 5, + "counter should have been incremented once for each system" + ); + } +} diff --git a/crates/bevy_ecs/src/into_system.rs b/crates/bevy_ecs/src/into_system.rs index e7372f1802..48b3980ede 100644 --- a/crates/bevy_ecs/src/into_system.rs +++ b/crates/bevy_ecs/src/into_system.rs @@ -1,7 +1,7 @@ use crate::{ resource_query::{FetchResource, ResourceQuery, UnsafeClone}, system::{System, SystemId, ThreadLocalExecution}, - Commands, Resources, Res, + Commands, Resources, executor::ArchetypeAccess, }; use core::marker::PhantomData; use hecs::{ @@ -9,10 +9,11 @@ use hecs::{ }; use std::borrow::Cow; -pub struct SystemFn +pub struct SystemFn where F: FnMut(Commands, &World, &Resources) + Send + Sync, Init: FnMut(&mut Resources) + Send + Sync, + SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync, { pub func: F, pub init_func: Init, @@ -20,18 +21,29 @@ where pub thread_local_execution: ThreadLocalExecution, pub name: Cow<'static, str>, pub id: SystemId, + pub archetype_access: ArchetypeAccess, + pub set_archetype_access: SetArchetypeAccess, // TODO: add dependency info here } -impl System for SystemFn +impl System for SystemFn where F: FnMut(Commands, &World, &Resources) + Send + Sync, Init: FnMut(&mut Resources) + Send + Sync, + SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync, { fn name(&self) -> Cow<'static, str> { self.name.clone() } + fn update_archetype_access(&mut self, world: &World) { + (self.set_archetype_access)(world, &mut self.archetype_access); + } + + fn get_archetype_access(&self) -> Option<&ArchetypeAccess> { + Some(&self.archetype_access) + } + fn thread_local_execution(&self) -> ThreadLocalExecution { self.thread_local_execution } @@ -95,6 +107,12 @@ macro_rules! impl_into_foreach_system { init_func: move |resources| { <($($resource,)*)>::initialize(resources, Some(id)); }, + archetype_access: ArchetypeAccess::default(), + set_archetype_access: |world, archetype_access| { + for archetype in world.archetypes() { + archetype_access.set_bits_for_query::<($($component,)*)>(world); + } + }, }) } } @@ -165,6 +183,12 @@ macro_rules! impl_into_query_system { init_func: move |resources| { <($($resource,)*)>::initialize(resources, Some(id)); }, + archetype_access: ArchetypeAccess::default(), + set_archetype_access: |world, archetype_access| { + for archetype in world.archetypes() { + $(archetype_access.set_bits_for_query::<$query>(world);)* + } + }, }) } } @@ -311,4 +335,9 @@ where fn id(&self) -> SystemId { self.id } + fn update_archetype_access(&mut self, _world: &World) { + } + fn get_archetype_access(&self) -> Option<&ArchetypeAccess> { + None + } } diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 16c4624501..34708e21d2 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -1,6 +1,7 @@ pub use hecs::{Query as HecsQuery, *}; mod commands; +mod executor; mod into_system; #[cfg(feature = "profiler")] pub mod profiler; @@ -15,5 +16,6 @@ pub use into_system::{IntoForEachSystem, IntoQuerySystem, Query, ThreadLocalSyst pub use resource_query::{FetchResource, Local, Res, ResMut, ResourceQuery}; pub use resources::{FromResources, Resource, Resources}; pub use schedule::Schedule; +pub use executor::Executor; pub use system::{System, SystemId}; pub use world_builder::{WorldBuilder, WorldBuilderSource}; diff --git a/crates/bevy_ecs/src/schedule.rs b/crates/bevy_ecs/src/schedule.rs index 0fd2fccc5f..73842f08de 100644 --- a/crates/bevy_ecs/src/schedule.rs +++ b/crates/bevy_ecs/src/schedule.rs @@ -2,17 +2,16 @@ use crate::{ system::{System, ThreadLocalExecution}, Resources, SystemId, World, }; -use rayon::prelude::*; use std::{ borrow::Cow, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet}, sync::{Mutex, Arc}, }; #[derive(Default)] pub struct Schedule { - stages: HashMap, Vec>>, - stage_order: Vec>, - system_ids: HashSet, + pub(crate) stages: HashMap, Vec>>>>, + pub(crate) stage_order: Vec>, + pub(crate) system_ids: HashSet, is_dirty: bool, } @@ -91,7 +90,7 @@ impl Schedule { ); } self.system_ids.insert(system.id()); - systems.push(system); + systems.push(Arc::new(Mutex::new(system))); self.is_dirty = true; self @@ -103,6 +102,7 @@ impl Schedule { for system in stage_systems.iter_mut() { #[cfg(feature = "profiler")] crate::profiler::profiler_start(resources, system.name().clone()); + let mut system = system.lock().unwrap(); match system.thread_local_execution() { ThreadLocalExecution::NextFlush => system.run(world, resources), ThreadLocalExecution::Immediate => { @@ -118,6 +118,7 @@ impl Schedule { // "flush" // NOTE: when this is made parallel a full sync is required here for system in stage_systems.iter_mut() { + let mut system = system.lock().unwrap(); match system.thread_local_execution() { ThreadLocalExecution::NextFlush => { system.run_thread_local(world, resources) @@ -129,35 +130,6 @@ impl Schedule { } } - pub fn dumb_par_run(&mut self, world: &mut World, resources: &mut Resources) { - for stage_name in self.stage_order.iter() { - if let Some(stage_systems) = self.stages.get_mut(stage_name) { - stage_systems.par_iter_mut().for_each(|system| { - match system.thread_local_execution() { - ThreadLocalExecution::NextFlush => system.run(world, resources), - ThreadLocalExecution::Immediate => { - system.run(world, resources); - } - } - }); - - // "flush" - // NOTE: when this is made parallel a full sync is required here - for system in stage_systems.iter_mut() { - match system.thread_local_execution() { - ThreadLocalExecution::NextFlush => { - system.run_thread_local(world, resources) - } - ThreadLocalExecution::Immediate => { - // TODO: this should happen immediately after thread local systems - system.run_thread_local(world, resources) - } - } - } - } - } - } - pub fn initialize(&mut self, resources: &mut Resources) { if !self.is_dirty { return; @@ -165,32 +137,11 @@ impl Schedule { for stage in self.stages.values_mut() { for system in stage.iter_mut() { + let mut system = system.lock().unwrap(); system.initialize(resources); } } self.is_dirty = false; } -} - -// #[cfg(test)] -// mod tests { -// use crate::{Resources, Schedule, World}; -// use crate::{IntoForEachSystem, IntoQuerySystem}; - -// #[test] -// fn schedule() { -// let mut world = World::new(); -// let mut resources = Resources::default(); - -// world.spawn((1u32, 2u64)); - -// let mut schedule = Schedule::default(); -// schedule.add_stage("A"); -// schedule.add_stage("B"); - -// let xy_system = (|_x: &u32, _y: &mut u64| { - -// }).system(); -// } -// } +} \ No newline at end of file diff --git a/crates/bevy_ecs/src/system.rs b/crates/bevy_ecs/src/system.rs index aa4f15a92d..7965a4c45a 100644 --- a/crates/bevy_ecs/src/system.rs +++ b/crates/bevy_ecs/src/system.rs @@ -1,4 +1,4 @@ -use crate::{Resources, World}; +use crate::{Resources, World, executor::ArchetypeAccess}; use std::borrow::Cow; #[derive(Copy, Clone)] @@ -19,6 +19,8 @@ impl SystemId { pub trait System: Send + Sync { fn name(&self) -> Cow<'static, str>; fn id(&self) -> SystemId; + fn update_archetype_access(&mut self, world: &World); + fn get_archetype_access(&self) -> Option<&ArchetypeAccess>; fn thread_local_execution(&self) -> ThreadLocalExecution; fn run(&mut self, world: &World, resources: &Resources); fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources);