From 4712e96aa897b00f89d52d9ac89ffa3d57634142 Mon Sep 17 00:00:00 2001 From: Carter Anderson Date: Wed, 15 Jul 2020 17:20:36 -0700 Subject: [PATCH] ecs: make parallel executor resource-aware --- crates/bevy_ecs/src/into_system.rs | 12 +- crates/bevy_ecs/src/lib.rs | 2 +- crates/bevy_ecs/src/parallel_executor.rs | 460 ++++++++++++++--------- crates/bevy_ecs/src/resource_query.rs | 25 +- crates/bevy_ecs/src/schedule.rs | 14 +- crates/bevy_ecs/src/system.rs | 43 ++- crates/bevy_render/Cargo.toml | 2 +- crates/bevy_render/src/draw.rs | 20 +- 8 files changed, 395 insertions(+), 183 deletions(-) diff --git a/crates/bevy_ecs/src/into_system.rs b/crates/bevy_ecs/src/into_system.rs index 8004640176..53e2b7e001 100644 --- a/crates/bevy_ecs/src/into_system.rs +++ b/crates/bevy_ecs/src/into_system.rs @@ -1,7 +1,7 @@ use crate::{ resource_query::{FetchResource, ResourceQuery, UnsafeClone}, system::{ArchetypeAccess, System, SystemId, ThreadLocalExecution}, - Commands, Resources, + Commands, Resources, TypeAccess, }; use core::marker::PhantomData; use hecs::{ @@ -22,6 +22,7 @@ where pub thread_local_func: ThreadLocalF, pub init_func: Init, pub thread_local_execution: ThreadLocalExecution, + pub resource_access: TypeAccess, pub name: Cow<'static, str>, pub id: SystemId, pub archetype_access: ArchetypeAccess, @@ -45,9 +46,13 @@ where (self.set_archetype_access)(world, &mut self.archetype_access, &mut self.state); } - fn get_archetype_access(&self) -> &ArchetypeAccess { + fn archetype_access(&self) -> &ArchetypeAccess { &self.archetype_access } + + fn resource_access(&self) -> &TypeAccess { + &self.resource_access + } fn thread_local_execution(&self) -> ThreadLocalExecution { self.thread_local_execution @@ -115,6 +120,7 @@ macro_rules! impl_into_foreach_system { init_func: move |resources| { <($($resource,)*)>::initialize(resources, Some(id)); }, + resource_access: <<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::access(), archetype_access: ArchetypeAccess::default(), set_archetype_access: |world, archetype_access, _state| { archetype_access.clear(); @@ -253,6 +259,7 @@ macro_rules! impl_into_query_system { init_func: move |resources| { <($($resource,)*)>::initialize(resources, Some(id)); }, + resource_access: <<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::access(), archetype_access: ArchetypeAccess::default(), set_archetype_access: |world, archetype_access, state| { archetype_access.clear(); @@ -378,6 +385,7 @@ where thread_local_execution: ThreadLocalExecution::Immediate, name: core::any::type_name::().into(), id: SystemId::new(), + resource_access: TypeAccess::default(), archetype_access: ArchetypeAccess::default(), }) } diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 4aad2160d0..6115994cea 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -17,5 +17,5 @@ pub use resource_query::{FetchResource, Local, Res, ResMut, ResourceQuery}; pub use resources::{FromResources, Resource, Resources}; pub use schedule::Schedule; pub use parallel_executor::ParallelExecutor; -pub use system::{System, SystemId}; +pub use system::{System, SystemId, TypeAccess, ArchetypeAccess}; pub use world_builder::{WorldBuilder, WorldBuilderSource}; diff --git a/crates/bevy_ecs/src/parallel_executor.rs b/crates/bevy_ecs/src/parallel_executor.rs index d5771d03fd..cc5a36e0ff 100644 --- a/crates/bevy_ecs/src/parallel_executor.rs +++ b/crates/bevy_ecs/src/parallel_executor.rs @@ -1,7 +1,10 @@ -use crate::{system::{ArchetypeAccess, ThreadLocalExecution}, Resources, Schedule, System}; +use crate::{ + system::{ArchetypeAccess, ThreadLocalExecution}, + Resources, Schedule, System, TypeAccess, +}; use crossbeam_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; -use hecs::{World}; +use hecs::World; use rayon::ScopeFifo; use std::sync::{Arc, Mutex}; @@ -12,14 +15,12 @@ pub struct ParallelExecutor { impl ParallelExecutor { pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) { - // TODO: if world archetype generation hasnt changed, dont update here - let mut executor_stages = vec![ExecutorStage::default(); schedule.stage_order.len()]; + let schedule_generation = schedule.generation(); for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() { let executor_stage = &mut executor_stages[stage_index]; - // ensure finished dependencies has the required number of bits if let Some(systems) = schedule.stages.get(stage_name) { - executor_stage.prepare(world, systems); + executor_stage.prepare(world, systems, schedule_generation); } } @@ -43,13 +44,13 @@ pub struct ExecutorStage { system_dependencies: Vec, /// each system's dependents (the systems that can't run until this system has run) system_dependents: Vec>, - /// the currently finished systems finished_systems: FixedBitSet, running_systems: FixedBitSet, sender: Sender, receiver: Receiver, + last_prepare_schedule_generation: usize, } impl Default for ExecutorStage { @@ -62,6 +63,7 @@ impl Default for ExecutorStage { running_systems: Default::default(), sender, receiver, + last_prepare_schedule_generation: usize::MAX, // MAX forces prepare to run the first time } } } @@ -77,63 +79,104 @@ enum RunReadyType { } impl ExecutorStage { - pub fn prepare(&mut self, world: &World, systems: &Vec>>>) { - self.system_dependencies = vec![FixedBitSet::with_capacity(systems.len()); systems.len()]; - self.system_dependents = vec![Vec::new(); systems.len()]; - self.finished_systems.grow(systems.len()); - self.running_systems.grow(systems.len()); + // TODO: add a start_index parameter here + pub fn prepare( + &mut self, + world: &World, + systems: &Vec>>>, + schedule_generation: usize, + // last_world_generation + // last_world_generation_start_index <- for cases where we do a partial update midway through execution (then need to update everything that came before in the next execution) + ) { + let schedule_changed = self.last_prepare_schedule_generation != schedule_generation; - // update each system's archetype access to latest world archetypes - for system in systems.iter() { - let mut system = system.lock().unwrap(); - system.update_archetype_access(world); + // if the schedule has changed, clear executor state / fill it with new defaults + if schedule_changed { + self.system_dependencies.clear(); + self.system_dependencies + .resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len())); + + self.system_dependents.clear(); + self.system_dependents.resize(systems.len(), Vec::new()); + + self.finished_systems.grow(systems.len()); + self.running_systems.grow(systems.len()); } - let mut current_archetype_access = ArchetypeAccess::default(); - let mut last_thread_local_index: Option = None; - for (system_index, system) in systems.iter().enumerate() { - let system = system.lock().unwrap(); - let archetype_access = system.get_archetype_access(); - match system.thread_local_execution() { - ThreadLocalExecution::NextFlush => { - // if any system before this one conflicts, check all systems that came before for compatibility - if current_archetype_access.is_compatible(archetype_access) == false { - for earlier_system_index in 0..system_index { - let earlier_system = systems[earlier_system_index].lock().unwrap(); + // TODO: check archetype generation here + let world_generation_changed = true; - // ignore "immediate" thread local systems, we handle them separately - if let ThreadLocalExecution::Immediate = - earlier_system.thread_local_execution() - { - continue; - } + if world_generation_changed { + // update each system's archetype access to latest world archetypes + for system in systems.iter() { + let mut system = system.lock().unwrap(); + system.update_archetype_access(world); + } + } - let earlier_archetype_access = earlier_system.get_archetype_access(); - // if earlier system is incompatible, make the current system dependent - if earlier_archetype_access.is_compatible(archetype_access) == false { - self.system_dependents[earlier_system_index].push(system_index); - self.system_dependencies[system_index].insert(earlier_system_index); + if schedule_changed || world_generation_changed { + // calculate dependencies between systems and build execution order + let mut current_archetype_access = ArchetypeAccess::default(); + let mut current_resource_access = TypeAccess::default(); + let mut last_thread_local_index: Option = None; + for (system_index, system) in systems.iter().enumerate() { + let system = system.lock().unwrap(); + let archetype_access = system.archetype_access(); + match system.thread_local_execution() { + ThreadLocalExecution::NextFlush => { + let resource_access = system.resource_access(); + // if any system before this one conflicts, check all systems that came before for compatibility + if current_archetype_access.is_compatible(archetype_access) == false + || current_resource_access.is_compatible(resource_access) == false + { + for earlier_system_index in 0..system_index { + let earlier_system = systems[earlier_system_index].lock().unwrap(); + + // ignore "immediate" thread local systems, we handle them separately + if let ThreadLocalExecution::Immediate = + earlier_system.thread_local_execution() + { + continue; + } + + // if earlier system is incompatible, make the current system dependent + if earlier_system + .archetype_access() + .is_compatible(archetype_access) + == false + || earlier_system + .resource_access() + .is_compatible(resource_access) + == false + { + self.system_dependents[earlier_system_index].push(system_index); + self.system_dependencies[system_index] + .insert(earlier_system_index); + } } } - } - current_archetype_access.union(archetype_access); + current_archetype_access.union(archetype_access); + current_resource_access.union(resource_access); - if let Some(last_thread_local_index) = last_thread_local_index { - self.system_dependents[last_thread_local_index].push(system_index); - self.system_dependencies[system_index].insert(last_thread_local_index); + if let Some(last_thread_local_index) = last_thread_local_index { + self.system_dependents[last_thread_local_index].push(system_index); + self.system_dependencies[system_index].insert(last_thread_local_index); + } } - } - ThreadLocalExecution::Immediate => { - last_thread_local_index = Some(system_index); - for earlier_system_index in 0..system_index { - // treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively - self.system_dependents[earlier_system_index].push(system_index); - self.system_dependencies[system_index].insert(earlier_system_index); + ThreadLocalExecution::Immediate => { + last_thread_local_index = Some(system_index); + for earlier_system_index in 0..system_index { + // treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively + self.system_dependents[earlier_system_index].push(system_index); + self.system_dependencies[system_index].insert(earlier_system_index); + } } } } } + + self.last_prepare_schedule_generation = schedule_generation; } fn run_ready_systems<'run>( @@ -207,6 +250,7 @@ impl ExecutorStage { ) { self.finished_systems.clear(); self.running_systems.clear(); + let mut run_ready_result = RunReadyResult::Ok; rayon::scope_fifo(|scope| { run_ready_result = @@ -228,7 +272,7 @@ impl ExecutorStage { self.sender.send(thread_local_index).unwrap(); // TODO: if archetype generation has changed, call "prepare" on all systems after this one - + run_ready_result = RunReadyResult::Ok; } else { // wait for a system to finish, then run its dependents @@ -272,7 +316,7 @@ impl ExecutorStage { #[cfg(test)] mod tests { use super::ParallelExecutor; - use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World}; + use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World, ResMut}; use fixedbitset::FixedBitSet; use std::sync::{Arc, Mutex}; @@ -281,148 +325,224 @@ mod tests { count: Arc>, } - fn read_u32(counter: Res, _query: Query<&u32>) { - let mut count = counter.count.lock().unwrap(); - assert!( - *count < 2, - "read_32 should be one of the first two systems to run" - ); - *count += 1; - } - - fn write_float(counter: Res, _query: Query<&f32>) { - let mut count = counter.count.lock().unwrap(); - assert!( - *count < 2, - "write_float should be one of the first two systems to run" - ); - *count += 1; - } - - fn read_u32_write_u64(counter: Res, _query: Query<(&u32, &mut u64)>) { - let mut count = counter.count.lock().unwrap(); - assert_eq!( - *count, 2, - "read_u32_write_u64 should always be the third system to run" - ); - *count += 1; - } - - fn read_u64(counter: Res, _query: Query<&u64>) { - let mut count = counter.count.lock().unwrap(); - assert_eq!( - *count, 3, - "read_u64 should always be the fourth system to run" - ); - *count += 1; - } - - fn write_u64(counter: Res, _query: Query<&mut u64>) { - let mut count = counter.count.lock().unwrap(); - assert_eq!( - *count, 4, - "write_u64 should always be the fifth system to run" - ); - *count += 1; - } - - fn thread_local_system(_world: &mut World, resources: &mut Resources) { - let counter = resources.get::().unwrap(); - let mut count = counter.count.lock().unwrap(); - assert_eq!( - *count, 5, - "thread_local_system should always be the sixth system to run" - ); - *count += 1; - } - - fn write_f32(counter: Res, _query: Query<&mut f32>) { - let mut count = counter.count.lock().unwrap(); - assert_eq!( - *count, 6, - "write_f32 should always be the seventh system to run" - ); - *count += 1; - } - #[test] fn schedule() { let mut world = World::new(); let mut resources = Resources::default(); resources.insert(Counter::default()); + resources.insert(1.0f64); + resources.insert(2isize); world.spawn((1.0f32,)); world.spawn((1u32, 1u64)); world.spawn((2u32,)); let mut schedule = Schedule::default(); - schedule.add_stage("A"); - schedule.add_stage("B"); + schedule.add_stage("A"); // component queries + schedule.add_stage("B"); // thread local + schedule.add_stage("C"); // resources + + // A systems + + fn read_u32(counter: Res, _query: Query<&u32>) { + let mut count = counter.count.lock().unwrap(); + assert!( + *count < 2, + "should be one of the first two systems to run" + ); + *count += 1; + } + + fn write_float(counter: Res, _query: Query<&f32>) { + let mut count = counter.count.lock().unwrap(); + assert!( + *count < 2, + "should be one of the first two systems to run" + ); + *count += 1; + } + + fn read_u32_write_u64(counter: Res, _query: Query<(&u32, &mut u64)>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 2, + "should always be the 3rd system to run" + ); + *count += 1; + } + + fn read_u64(counter: Res, _query: Query<&u64>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 3, + "should always be the 4th system to run" + ); + *count += 1; + } schedule.add_system_to_stage("A", read_u32.system()); schedule.add_system_to_stage("A", write_float.system()); schedule.add_system_to_stage("A", read_u32_write_u64.system()); schedule.add_system_to_stage("A", read_u64.system()); + + // B systems + + fn write_u64(counter: Res, _query: Query<&mut u64>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 4, + "should always be the 5th system to run" + ); + *count += 1; + } + + fn thread_local_system(_world: &mut World, resources: &mut Resources) { + let counter = resources.get::().unwrap(); + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 5, + "should always be the 6th system to run" + ); + *count += 1; + } + + fn write_f32(counter: Res, _query: Query<&mut f32>) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 6, + "should always be the 7th system to run" + ); + *count += 1; + } + schedule.add_system_to_stage("B", write_u64.system()); schedule.add_system_to_stage("B", thread_local_system.thread_local_system()); schedule.add_system_to_stage("B", write_f32.system()); + + // C systems + + fn read_f64_res(counter: Res, _f64_res: Res) { + let mut count = counter.count.lock().unwrap(); + assert!( + 7 == *count || *count == 8, + "should always be the 8th or 9th system to run" + ); + *count += 1; + } + + fn read_isize_res(counter: Res, _isize_res: Res) { + let mut count = counter.count.lock().unwrap(); + assert!( + 7 == *count || *count == 8, + "should always be the 8th or 9th system to run" + ); + *count += 1; + } + + fn read_isize_write_f64_res(counter: Res, _isize_res: Res, _f64_res: ResMut) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 9, + "should always be the 10th system to run" + ); + *count += 1; + } + + fn write_f64_res(counter: Res, _f64_res: ResMut) { + let mut count = counter.count.lock().unwrap(); + assert_eq!( + *count, 10, + "should always be the 11th system to run" + ); + *count += 1; + } + + schedule.add_system_to_stage("C", read_f64_res.system()); + schedule.add_system_to_stage("C", read_isize_res.system()); + schedule.add_system_to_stage("C", read_isize_write_f64_res.system()); + schedule.add_system_to_stage("C", write_f64_res.system()); + + fn run_executor_and_validate( + executor: &mut ParallelExecutor, + schedule: &mut Schedule, + world: &mut World, + resources: &mut Resources, + ) { + executor.prepare(schedule, world); + + assert_eq!( + executor.stages[0].system_dependents, + vec![vec![2], vec![], vec![3], vec![]] + ); + assert_eq!( + executor.stages[1].system_dependents, + vec![vec![1], vec![2], vec![]] + ); + assert_eq!( + executor.stages[2].system_dependents, + vec![vec![2], vec![2], vec![3], vec![]] + ); + + let stage_0_len = executor.stages[0].system_dependencies.len(); + let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len); + read_u32_write_u64_deps.insert(0); + let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len); + read_u64_deps.insert(2); + + assert_eq!( + executor.stages[0].system_dependencies, + vec![ + FixedBitSet::with_capacity(stage_0_len), + FixedBitSet::with_capacity(stage_0_len), + read_u32_write_u64_deps, + read_u64_deps, + ] + ); + + let stage_1_len = executor.stages[1].system_dependencies.len(); + let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len); + thread_local_deps.insert(0); + let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len); + write_f64_deps.insert(1); + assert_eq!( + executor.stages[1].system_dependencies, + vec![ + FixedBitSet::with_capacity(stage_1_len), + thread_local_deps, + write_f64_deps + ] + ); + + let stage_2_len = executor.stages[2].system_dependencies.len(); + let mut read_isize_write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len); + read_isize_write_f64_res_deps.insert(0); + let mut write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len); + write_f64_res_deps.insert(2); + assert_eq!( + executor.stages[2].system_dependencies, + vec![ + FixedBitSet::with_capacity(stage_2_len), + FixedBitSet::with_capacity(stage_2_len), + read_isize_write_f64_res_deps, + write_f64_res_deps + ] + ); + + executor.run(schedule, world, resources); + + let counter = resources.get::().unwrap(); + assert_eq!( + *counter.count.lock().unwrap(), + 11, + "counter should have been incremented once for each system" + ); + } + let mut executor = ParallelExecutor::default(); run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources); // run again (with counter reset) to ensure executor works correctly across runs *resources.get::().unwrap().count.lock().unwrap() = 0; run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources); } - - fn run_executor_and_validate(executor: &mut ParallelExecutor, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) { - executor.prepare(schedule, world); - - assert_eq!( - executor.stages[0].system_dependents, - vec![vec![2], vec![], vec![3], vec![]] - ); - assert_eq!( - executor.stages[1].system_dependents, - vec![vec![1], vec![2], vec![]] - ); - - let stage_0_len = executor.stages[0].system_dependencies.len(); - let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len); - read_u32_write_u64_deps.insert(0); - let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len); - read_u64_deps.insert(2); - - assert_eq!( - executor.stages[0].system_dependencies, - vec![ - FixedBitSet::with_capacity(stage_0_len), - FixedBitSet::with_capacity(stage_0_len), - read_u32_write_u64_deps, - read_u64_deps, - ] - ); - - let stage_1_len = executor.stages[1].system_dependencies.len(); - let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len); - thread_local_deps.insert(0); - let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len); - write_f64_deps.insert(1); - assert_eq!( - executor.stages[1].system_dependencies, - vec![ - FixedBitSet::with_capacity(stage_1_len), - thread_local_deps, - write_f64_deps - ] - ); - - executor.run(schedule, world, resources); - - let counter = resources.get::().unwrap(); - assert_eq!( - *counter.count.lock().unwrap(), - 7, - "counter should have been incremented once for each system" - ); - } } diff --git a/crates/bevy_ecs/src/resource_query.rs b/crates/bevy_ecs/src/resource_query.rs index fd05550570..73cb5755bd 100644 --- a/crates/bevy_ecs/src/resource_query.rs +++ b/crates/bevy_ecs/src/resource_query.rs @@ -1,4 +1,4 @@ -use crate::{resources::FromResources, system::SystemId, Archetype, Component, Resources}; +use crate::{resources::FromResources, system::{TypeAccess, SystemId}, Archetype, Component, Resources}; use core::{ any::TypeId, ops::{Deref, DerefMut}, @@ -148,6 +148,7 @@ pub trait FetchResource<'a>: Sized { /// Type of value to be fetched type Item: UnsafeClone; + fn access() -> TypeAccess; fn borrow(resource_archetypes: &HashMap); fn release(resource_archetypes: &HashMap); @@ -180,6 +181,11 @@ impl<'a, T: Component> FetchResource<'a> for FetchResourceRead { archetype.release::(); } } + fn access() -> TypeAccess { + let mut access = TypeAccess::default(); + access.immutable.insert(TypeId::of::()); + access + } } impl<'a, T: Component> ResourceQuery for ResMut<'a, T> { @@ -204,6 +210,11 @@ impl<'a, T: Component> FetchResource<'a> for FetchResourceWrite { archetype.release_mut::(); } } + fn access() -> TypeAccess { + let mut access = TypeAccess::default(); + access.mutable.insert(TypeId::of::()); + access + } } impl<'a, T: Component + FromResources> ResourceQuery for Local<'a, T> { @@ -248,6 +259,11 @@ impl<'a, T: Component + FromResources> FetchResource<'a> for FetchResourceLocalM archetype.release_mut::(); } } + fn access() -> TypeAccess { + let mut access = TypeAccess::default(); + access.mutable.insert(TypeId::of::()); + access + } } macro_rules! tuple_impl { @@ -269,6 +285,13 @@ macro_rules! tuple_impl { unsafe fn get(resources: &'a Resources, system_id: Option) -> Self::Item { ($($name::get(resources, system_id),)*) } + + #[allow(unused_mut)] + fn access() -> TypeAccess { + let mut access = TypeAccess::default(); + $(access.union(&$name::access());)* + access + } } impl<$($name: ResourceQuery),*> ResourceQuery for ($($name,)*) { diff --git a/crates/bevy_ecs/src/schedule.rs b/crates/bevy_ecs/src/schedule.rs index 24c173d2af..02a7723e73 100644 --- a/crates/bevy_ecs/src/schedule.rs +++ b/crates/bevy_ecs/src/schedule.rs @@ -12,7 +12,8 @@ pub struct Schedule { pub(crate) stages: HashMap, Vec>>>>, pub(crate) stage_order: Vec>, pub(crate) system_ids: HashSet, - is_dirty: bool, + generation: usize, + last_initialize_generation: usize, } impl Schedule { @@ -92,7 +93,7 @@ impl Schedule { self.system_ids.insert(system.id()); systems.push(Arc::new(Mutex::new(system))); - self.is_dirty = true; + self.generation += 1; self } @@ -131,8 +132,9 @@ impl Schedule { } } + // TODO: move this code to ParallelExecutor pub fn initialize(&mut self, resources: &mut Resources) { - if !self.is_dirty { + if self.last_initialize_generation == self.generation { return; } @@ -143,6 +145,10 @@ impl Schedule { } } - self.is_dirty = false; + self.last_initialize_generation = self.generation; + } + + pub fn generation(&self) -> usize { + self.generation } } \ No newline at end of file diff --git a/crates/bevy_ecs/src/system.rs b/crates/bevy_ecs/src/system.rs index 6c3659eb6f..c1ff22d881 100644 --- a/crates/bevy_ecs/src/system.rs +++ b/crates/bevy_ecs/src/system.rs @@ -1,7 +1,7 @@ use crate::{Resources, World}; use fixedbitset::FixedBitSet; use hecs::{Access, Query}; -use std::borrow::Cow; +use std::{any::TypeId, borrow::Cow, collections::HashSet}; #[derive(Copy, Clone)] pub enum ThreadLocalExecution { @@ -22,7 +22,8 @@ pub trait System: Send + Sync { fn name(&self) -> Cow<'static, str>; fn id(&self) -> SystemId; fn update_archetype_access(&mut self, world: &World); - fn get_archetype_access(&self) -> &ArchetypeAccess; + fn archetype_access(&self) -> &ArchetypeAccess; + fn resource_access(&self) -> &TypeAccess; fn thread_local_execution(&self) -> ThreadLocalExecution; fn run(&mut self, world: &World, resources: &Resources); fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources); @@ -72,10 +73,36 @@ impl ArchetypeAccess { } } +#[derive(Debug, Default, Eq, PartialEq, Clone)] +pub struct TypeAccess { + pub immutable: HashSet, + pub mutable: HashSet, +} + +impl TypeAccess { + pub fn is_compatible(&self, other: &TypeAccess) -> bool { + self.mutable.is_disjoint(&other.mutable) + && self.mutable.is_disjoint(&other.immutable) + && self.immutable.is_disjoint(&other.mutable) + } + + pub fn union(&mut self, other: &TypeAccess) { + self.mutable.extend(&other.mutable); + self.immutable.extend(&other.immutable); + } + + pub fn clear(&mut self) { + self.immutable.clear(); + self.mutable.clear(); + } +} + #[cfg(test)] mod tests { - use super::ArchetypeAccess; + use super::{TypeAccess, ArchetypeAccess}; use hecs::World; + use crate::{ResourceQuery, FetchResource, Res, ResMut}; + use std::any::TypeId; struct A; struct B; @@ -106,4 +133,14 @@ mod tests { assert!(access.immutable.contains(e2_archetype)); assert!(access.immutable.contains(e3_archetype)); } + + #[test] + fn resource_query_access() { + let access = <<(Res, ResMut, Res) as ResourceQuery>::Fetch as FetchResource>::access(); + let mut expected_access = TypeAccess::default(); + expected_access.immutable.insert(TypeId::of::()); + expected_access.immutable.insert(TypeId::of::()); + expected_access.mutable.insert(TypeId::of::()); + assert_eq!(access, expected_access); + } } diff --git a/crates/bevy_render/Cargo.toml b/crates/bevy_render/Cargo.toml index c554d1c4e3..59dad35edd 100644 --- a/crates/bevy_render/Cargo.toml +++ b/crates/bevy_render/Cargo.toml @@ -31,7 +31,7 @@ serde = { version = "1", features = ["derive"] } bitflags = "1.0" smallvec = "1.4.0" # TODO: replace once_cell with std equivalent if/when this lands: https://github.com/rust-lang/rfcs/pull/2788 -once_cell = "1.3.1" +once_cell = "1.4.0" downcast-rs = "1.1.1" thiserror = "1.0" anyhow = "1.0" \ No newline at end of file diff --git a/crates/bevy_render/src/draw.rs b/crates/bevy_render/src/draw.rs index ef1d585715..c890565adf 100644 --- a/crates/bevy_render/src/draw.rs +++ b/crates/bevy_render/src/draw.rs @@ -13,7 +13,7 @@ use crate::{ use bevy_asset::{Assets, Handle}; use bevy_ecs::{ resource_query::UnsafeClone, Archetype, FetchResource, Query, Res, ResMut, ResourceQuery, - Resources, SystemId, + Resources, SystemId, TypeAccess, }; use bevy_property::Properties; use std::{any::TypeId, collections::HashMap, ops::Range, sync::Arc}; @@ -148,6 +148,7 @@ impl<'a> ResourceQuery for DrawContext<'a> { pub struct FetchDrawContext; +// TODO: derive this impl impl<'a> FetchResource<'a> for FetchDrawContext { type Item = DrawContext<'a>; fn borrow(resource_archetypes: &HashMap) { @@ -213,6 +214,23 @@ impl<'a> FetchResource<'a> for FetchDrawContext { current_pipeline: None, } } + + fn access() -> TypeAccess { + let mut access = TypeAccess::default(); + access + .mutable + .insert(TypeId::of::>()); + access.mutable.insert(TypeId::of::>()); + access.mutable.insert(TypeId::of::()); + access + .immutable + .insert(TypeId::of::>()); + access + .immutable + .insert(TypeId::of::()); + access.immutable.insert(TypeId::of::()); + access + } } impl<'a> DrawContext<'a> {