ecs: make parallel executor resource-aware
This commit is contained in:
parent
88781007b0
commit
4712e96aa8
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
resource_query::{FetchResource, ResourceQuery, UnsafeClone},
|
||||
system::{ArchetypeAccess, System, SystemId, ThreadLocalExecution},
|
||||
Commands, Resources,
|
||||
Commands, Resources, TypeAccess,
|
||||
};
|
||||
use core::marker::PhantomData;
|
||||
use hecs::{
|
||||
@ -22,6 +22,7 @@ where
|
||||
pub thread_local_func: ThreadLocalF,
|
||||
pub init_func: Init,
|
||||
pub thread_local_execution: ThreadLocalExecution,
|
||||
pub resource_access: TypeAccess,
|
||||
pub name: Cow<'static, str>,
|
||||
pub id: SystemId,
|
||||
pub archetype_access: ArchetypeAccess,
|
||||
@ -45,9 +46,13 @@ where
|
||||
(self.set_archetype_access)(world, &mut self.archetype_access, &mut self.state);
|
||||
}
|
||||
|
||||
fn get_archetype_access(&self) -> &ArchetypeAccess {
|
||||
fn archetype_access(&self) -> &ArchetypeAccess {
|
||||
&self.archetype_access
|
||||
}
|
||||
|
||||
fn resource_access(&self) -> &TypeAccess {
|
||||
&self.resource_access
|
||||
}
|
||||
|
||||
fn thread_local_execution(&self) -> ThreadLocalExecution {
|
||||
self.thread_local_execution
|
||||
@ -115,6 +120,7 @@ macro_rules! impl_into_foreach_system {
|
||||
init_func: move |resources| {
|
||||
<($($resource,)*)>::initialize(resources, Some(id));
|
||||
},
|
||||
resource_access: <<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::access(),
|
||||
archetype_access: ArchetypeAccess::default(),
|
||||
set_archetype_access: |world, archetype_access, _state| {
|
||||
archetype_access.clear();
|
||||
@ -253,6 +259,7 @@ macro_rules! impl_into_query_system {
|
||||
init_func: move |resources| {
|
||||
<($($resource,)*)>::initialize(resources, Some(id));
|
||||
},
|
||||
resource_access: <<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::access(),
|
||||
archetype_access: ArchetypeAccess::default(),
|
||||
set_archetype_access: |world, archetype_access, state| {
|
||||
archetype_access.clear();
|
||||
@ -378,6 +385,7 @@ where
|
||||
thread_local_execution: ThreadLocalExecution::Immediate,
|
||||
name: core::any::type_name::<F>().into(),
|
||||
id: SystemId::new(),
|
||||
resource_access: TypeAccess::default(),
|
||||
archetype_access: ArchetypeAccess::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -17,5 +17,5 @@ pub use resource_query::{FetchResource, Local, Res, ResMut, ResourceQuery};
|
||||
pub use resources::{FromResources, Resource, Resources};
|
||||
pub use schedule::Schedule;
|
||||
pub use parallel_executor::ParallelExecutor;
|
||||
pub use system::{System, SystemId};
|
||||
pub use system::{System, SystemId, TypeAccess, ArchetypeAccess};
|
||||
pub use world_builder::{WorldBuilder, WorldBuilderSource};
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
use crate::{system::{ArchetypeAccess, ThreadLocalExecution}, Resources, Schedule, System};
|
||||
use crate::{
|
||||
system::{ArchetypeAccess, ThreadLocalExecution},
|
||||
Resources, Schedule, System, TypeAccess,
|
||||
};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use fixedbitset::FixedBitSet;
|
||||
use hecs::{World};
|
||||
use hecs::World;
|
||||
use rayon::ScopeFifo;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@ -12,14 +15,12 @@ pub struct ParallelExecutor {
|
||||
|
||||
impl ParallelExecutor {
|
||||
pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) {
|
||||
// TODO: if world archetype generation hasnt changed, dont update here
|
||||
|
||||
let mut executor_stages = vec![ExecutorStage::default(); schedule.stage_order.len()];
|
||||
let schedule_generation = schedule.generation();
|
||||
for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() {
|
||||
let executor_stage = &mut executor_stages[stage_index];
|
||||
// ensure finished dependencies has the required number of bits
|
||||
if let Some(systems) = schedule.stages.get(stage_name) {
|
||||
executor_stage.prepare(world, systems);
|
||||
executor_stage.prepare(world, systems, schedule_generation);
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,13 +44,13 @@ pub struct ExecutorStage {
|
||||
system_dependencies: Vec<FixedBitSet>,
|
||||
/// each system's dependents (the systems that can't run until this system has run)
|
||||
system_dependents: Vec<Vec<usize>>,
|
||||
|
||||
/// the currently finished systems
|
||||
finished_systems: FixedBitSet,
|
||||
running_systems: FixedBitSet,
|
||||
|
||||
sender: Sender<usize>,
|
||||
receiver: Receiver<usize>,
|
||||
last_prepare_schedule_generation: usize,
|
||||
}
|
||||
|
||||
impl Default for ExecutorStage {
|
||||
@ -62,6 +63,7 @@ impl Default for ExecutorStage {
|
||||
running_systems: Default::default(),
|
||||
sender,
|
||||
receiver,
|
||||
last_prepare_schedule_generation: usize::MAX, // MAX forces prepare to run the first time
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -77,63 +79,104 @@ enum RunReadyType {
|
||||
}
|
||||
|
||||
impl ExecutorStage {
|
||||
pub fn prepare(&mut self, world: &World, systems: &Vec<Arc<Mutex<Box<dyn System>>>>) {
|
||||
self.system_dependencies = vec![FixedBitSet::with_capacity(systems.len()); systems.len()];
|
||||
self.system_dependents = vec![Vec::new(); systems.len()];
|
||||
self.finished_systems.grow(systems.len());
|
||||
self.running_systems.grow(systems.len());
|
||||
// TODO: add a start_index parameter here
|
||||
pub fn prepare(
|
||||
&mut self,
|
||||
world: &World,
|
||||
systems: &Vec<Arc<Mutex<Box<dyn System>>>>,
|
||||
schedule_generation: usize,
|
||||
// last_world_generation
|
||||
// last_world_generation_start_index <- for cases where we do a partial update midway through execution (then need to update everything that came before in the next execution)
|
||||
) {
|
||||
let schedule_changed = self.last_prepare_schedule_generation != schedule_generation;
|
||||
|
||||
// update each system's archetype access to latest world archetypes
|
||||
for system in systems.iter() {
|
||||
let mut system = system.lock().unwrap();
|
||||
system.update_archetype_access(world);
|
||||
// if the schedule has changed, clear executor state / fill it with new defaults
|
||||
if schedule_changed {
|
||||
self.system_dependencies.clear();
|
||||
self.system_dependencies
|
||||
.resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len()));
|
||||
|
||||
self.system_dependents.clear();
|
||||
self.system_dependents.resize(systems.len(), Vec::new());
|
||||
|
||||
self.finished_systems.grow(systems.len());
|
||||
self.running_systems.grow(systems.len());
|
||||
}
|
||||
|
||||
let mut current_archetype_access = ArchetypeAccess::default();
|
||||
let mut last_thread_local_index: Option<usize> = None;
|
||||
for (system_index, system) in systems.iter().enumerate() {
|
||||
let system = system.lock().unwrap();
|
||||
let archetype_access = system.get_archetype_access();
|
||||
match system.thread_local_execution() {
|
||||
ThreadLocalExecution::NextFlush => {
|
||||
// if any system before this one conflicts, check all systems that came before for compatibility
|
||||
if current_archetype_access.is_compatible(archetype_access) == false {
|
||||
for earlier_system_index in 0..system_index {
|
||||
let earlier_system = systems[earlier_system_index].lock().unwrap();
|
||||
// TODO: check archetype generation here
|
||||
let world_generation_changed = true;
|
||||
|
||||
// ignore "immediate" thread local systems, we handle them separately
|
||||
if let ThreadLocalExecution::Immediate =
|
||||
earlier_system.thread_local_execution()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if world_generation_changed {
|
||||
// update each system's archetype access to latest world archetypes
|
||||
for system in systems.iter() {
|
||||
let mut system = system.lock().unwrap();
|
||||
system.update_archetype_access(world);
|
||||
}
|
||||
}
|
||||
|
||||
let earlier_archetype_access = earlier_system.get_archetype_access();
|
||||
// if earlier system is incompatible, make the current system dependent
|
||||
if earlier_archetype_access.is_compatible(archetype_access) == false {
|
||||
self.system_dependents[earlier_system_index].push(system_index);
|
||||
self.system_dependencies[system_index].insert(earlier_system_index);
|
||||
if schedule_changed || world_generation_changed {
|
||||
// calculate dependencies between systems and build execution order
|
||||
let mut current_archetype_access = ArchetypeAccess::default();
|
||||
let mut current_resource_access = TypeAccess::default();
|
||||
let mut last_thread_local_index: Option<usize> = None;
|
||||
for (system_index, system) in systems.iter().enumerate() {
|
||||
let system = system.lock().unwrap();
|
||||
let archetype_access = system.archetype_access();
|
||||
match system.thread_local_execution() {
|
||||
ThreadLocalExecution::NextFlush => {
|
||||
let resource_access = system.resource_access();
|
||||
// if any system before this one conflicts, check all systems that came before for compatibility
|
||||
if current_archetype_access.is_compatible(archetype_access) == false
|
||||
|| current_resource_access.is_compatible(resource_access) == false
|
||||
{
|
||||
for earlier_system_index in 0..system_index {
|
||||
let earlier_system = systems[earlier_system_index].lock().unwrap();
|
||||
|
||||
// ignore "immediate" thread local systems, we handle them separately
|
||||
if let ThreadLocalExecution::Immediate =
|
||||
earlier_system.thread_local_execution()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// if earlier system is incompatible, make the current system dependent
|
||||
if earlier_system
|
||||
.archetype_access()
|
||||
.is_compatible(archetype_access)
|
||||
== false
|
||||
|| earlier_system
|
||||
.resource_access()
|
||||
.is_compatible(resource_access)
|
||||
== false
|
||||
{
|
||||
self.system_dependents[earlier_system_index].push(system_index);
|
||||
self.system_dependencies[system_index]
|
||||
.insert(earlier_system_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
current_archetype_access.union(archetype_access);
|
||||
current_archetype_access.union(archetype_access);
|
||||
current_resource_access.union(resource_access);
|
||||
|
||||
if let Some(last_thread_local_index) = last_thread_local_index {
|
||||
self.system_dependents[last_thread_local_index].push(system_index);
|
||||
self.system_dependencies[system_index].insert(last_thread_local_index);
|
||||
if let Some(last_thread_local_index) = last_thread_local_index {
|
||||
self.system_dependents[last_thread_local_index].push(system_index);
|
||||
self.system_dependencies[system_index].insert(last_thread_local_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
ThreadLocalExecution::Immediate => {
|
||||
last_thread_local_index = Some(system_index);
|
||||
for earlier_system_index in 0..system_index {
|
||||
// treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively
|
||||
self.system_dependents[earlier_system_index].push(system_index);
|
||||
self.system_dependencies[system_index].insert(earlier_system_index);
|
||||
ThreadLocalExecution::Immediate => {
|
||||
last_thread_local_index = Some(system_index);
|
||||
for earlier_system_index in 0..system_index {
|
||||
// treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively
|
||||
self.system_dependents[earlier_system_index].push(system_index);
|
||||
self.system_dependencies[system_index].insert(earlier_system_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.last_prepare_schedule_generation = schedule_generation;
|
||||
}
|
||||
|
||||
fn run_ready_systems<'run>(
|
||||
@ -207,6 +250,7 @@ impl ExecutorStage {
|
||||
) {
|
||||
self.finished_systems.clear();
|
||||
self.running_systems.clear();
|
||||
|
||||
let mut run_ready_result = RunReadyResult::Ok;
|
||||
rayon::scope_fifo(|scope| {
|
||||
run_ready_result =
|
||||
@ -228,7 +272,7 @@ impl ExecutorStage {
|
||||
self.sender.send(thread_local_index).unwrap();
|
||||
|
||||
// TODO: if archetype generation has changed, call "prepare" on all systems after this one
|
||||
|
||||
|
||||
run_ready_result = RunReadyResult::Ok;
|
||||
} else {
|
||||
// wait for a system to finish, then run its dependents
|
||||
@ -272,7 +316,7 @@ impl ExecutorStage {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ParallelExecutor;
|
||||
use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World};
|
||||
use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World, ResMut};
|
||||
use fixedbitset::FixedBitSet;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@ -281,148 +325,224 @@ mod tests {
|
||||
count: Arc<Mutex<usize>>,
|
||||
}
|
||||
|
||||
fn read_u32(counter: Res<Counter>, _query: Query<&u32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
*count < 2,
|
||||
"read_32 should be one of the first two systems to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_float(counter: Res<Counter>, _query: Query<&f32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
*count < 2,
|
||||
"write_float should be one of the first two systems to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 2,
|
||||
"read_u32_write_u64 should always be the third system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_u64(counter: Res<Counter>, _query: Query<&u64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 3,
|
||||
"read_u64 should always be the fourth system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 4,
|
||||
"write_u64 should always be the fifth system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn thread_local_system(_world: &mut World, resources: &mut Resources) {
|
||||
let counter = resources.get::<Counter>().unwrap();
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 5,
|
||||
"thread_local_system should always be the sixth system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 6,
|
||||
"write_f32 should always be the seventh system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schedule() {
|
||||
let mut world = World::new();
|
||||
let mut resources = Resources::default();
|
||||
resources.insert(Counter::default());
|
||||
resources.insert(1.0f64);
|
||||
resources.insert(2isize);
|
||||
|
||||
world.spawn((1.0f32,));
|
||||
world.spawn((1u32, 1u64));
|
||||
world.spawn((2u32,));
|
||||
|
||||
let mut schedule = Schedule::default();
|
||||
schedule.add_stage("A");
|
||||
schedule.add_stage("B");
|
||||
schedule.add_stage("A"); // component queries
|
||||
schedule.add_stage("B"); // thread local
|
||||
schedule.add_stage("C"); // resources
|
||||
|
||||
// A systems
|
||||
|
||||
fn read_u32(counter: Res<Counter>, _query: Query<&u32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
*count < 2,
|
||||
"should be one of the first two systems to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_float(counter: Res<Counter>, _query: Query<&f32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
*count < 2,
|
||||
"should be one of the first two systems to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 2,
|
||||
"should always be the 3rd system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_u64(counter: Res<Counter>, _query: Query<&u64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 3,
|
||||
"should always be the 4th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
schedule.add_system_to_stage("A", read_u32.system());
|
||||
schedule.add_system_to_stage("A", write_float.system());
|
||||
schedule.add_system_to_stage("A", read_u32_write_u64.system());
|
||||
schedule.add_system_to_stage("A", read_u64.system());
|
||||
|
||||
// B systems
|
||||
|
||||
fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 4,
|
||||
"should always be the 5th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn thread_local_system(_world: &mut World, resources: &mut Resources) {
|
||||
let counter = resources.get::<Counter>().unwrap();
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 5,
|
||||
"should always be the 6th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 6,
|
||||
"should always be the 7th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
schedule.add_system_to_stage("B", write_u64.system());
|
||||
schedule.add_system_to_stage("B", thread_local_system.thread_local_system());
|
||||
schedule.add_system_to_stage("B", write_f32.system());
|
||||
|
||||
|
||||
// C systems
|
||||
|
||||
fn read_f64_res(counter: Res<Counter>, _f64_res: Res<f64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
7 == *count || *count == 8,
|
||||
"should always be the 8th or 9th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_isize_res(counter: Res<Counter>, _isize_res: Res<isize>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert!(
|
||||
7 == *count || *count == 8,
|
||||
"should always be the 8th or 9th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn read_isize_write_f64_res(counter: Res<Counter>, _isize_res: Res<isize>, _f64_res: ResMut<f64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 9,
|
||||
"should always be the 10th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn write_f64_res(counter: Res<Counter>, _f64_res: ResMut<f64>) {
|
||||
let mut count = counter.count.lock().unwrap();
|
||||
assert_eq!(
|
||||
*count, 10,
|
||||
"should always be the 11th system to run"
|
||||
);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
schedule.add_system_to_stage("C", read_f64_res.system());
|
||||
schedule.add_system_to_stage("C", read_isize_res.system());
|
||||
schedule.add_system_to_stage("C", read_isize_write_f64_res.system());
|
||||
schedule.add_system_to_stage("C", write_f64_res.system());
|
||||
|
||||
fn run_executor_and_validate(
|
||||
executor: &mut ParallelExecutor,
|
||||
schedule: &mut Schedule,
|
||||
world: &mut World,
|
||||
resources: &mut Resources,
|
||||
) {
|
||||
executor.prepare(schedule, world);
|
||||
|
||||
assert_eq!(
|
||||
executor.stages[0].system_dependents,
|
||||
vec![vec![2], vec![], vec![3], vec![]]
|
||||
);
|
||||
assert_eq!(
|
||||
executor.stages[1].system_dependents,
|
||||
vec![vec![1], vec![2], vec![]]
|
||||
);
|
||||
assert_eq!(
|
||||
executor.stages[2].system_dependents,
|
||||
vec![vec![2], vec![2], vec![3], vec![]]
|
||||
);
|
||||
|
||||
let stage_0_len = executor.stages[0].system_dependencies.len();
|
||||
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
||||
read_u32_write_u64_deps.insert(0);
|
||||
let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
||||
read_u64_deps.insert(2);
|
||||
|
||||
assert_eq!(
|
||||
executor.stages[0].system_dependencies,
|
||||
vec![
|
||||
FixedBitSet::with_capacity(stage_0_len),
|
||||
FixedBitSet::with_capacity(stage_0_len),
|
||||
read_u32_write_u64_deps,
|
||||
read_u64_deps,
|
||||
]
|
||||
);
|
||||
|
||||
let stage_1_len = executor.stages[1].system_dependencies.len();
|
||||
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||
thread_local_deps.insert(0);
|
||||
let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||
write_f64_deps.insert(1);
|
||||
assert_eq!(
|
||||
executor.stages[1].system_dependencies,
|
||||
vec![
|
||||
FixedBitSet::with_capacity(stage_1_len),
|
||||
thread_local_deps,
|
||||
write_f64_deps
|
||||
]
|
||||
);
|
||||
|
||||
let stage_2_len = executor.stages[2].system_dependencies.len();
|
||||
let mut read_isize_write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len);
|
||||
read_isize_write_f64_res_deps.insert(0);
|
||||
let mut write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len);
|
||||
write_f64_res_deps.insert(2);
|
||||
assert_eq!(
|
||||
executor.stages[2].system_dependencies,
|
||||
vec![
|
||||
FixedBitSet::with_capacity(stage_2_len),
|
||||
FixedBitSet::with_capacity(stage_2_len),
|
||||
read_isize_write_f64_res_deps,
|
||||
write_f64_res_deps
|
||||
]
|
||||
);
|
||||
|
||||
executor.run(schedule, world, resources);
|
||||
|
||||
let counter = resources.get::<Counter>().unwrap();
|
||||
assert_eq!(
|
||||
*counter.count.lock().unwrap(),
|
||||
11,
|
||||
"counter should have been incremented once for each system"
|
||||
);
|
||||
}
|
||||
|
||||
let mut executor = ParallelExecutor::default();
|
||||
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
|
||||
// run again (with counter reset) to ensure executor works correctly across runs
|
||||
*resources.get::<Counter>().unwrap().count.lock().unwrap() = 0;
|
||||
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
|
||||
}
|
||||
|
||||
fn run_executor_and_validate(executor: &mut ParallelExecutor, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
|
||||
executor.prepare(schedule, world);
|
||||
|
||||
assert_eq!(
|
||||
executor.stages[0].system_dependents,
|
||||
vec![vec![2], vec![], vec![3], vec![]]
|
||||
);
|
||||
assert_eq!(
|
||||
executor.stages[1].system_dependents,
|
||||
vec![vec![1], vec![2], vec![]]
|
||||
);
|
||||
|
||||
let stage_0_len = executor.stages[0].system_dependencies.len();
|
||||
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
||||
read_u32_write_u64_deps.insert(0);
|
||||
let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
||||
read_u64_deps.insert(2);
|
||||
|
||||
assert_eq!(
|
||||
executor.stages[0].system_dependencies,
|
||||
vec![
|
||||
FixedBitSet::with_capacity(stage_0_len),
|
||||
FixedBitSet::with_capacity(stage_0_len),
|
||||
read_u32_write_u64_deps,
|
||||
read_u64_deps,
|
||||
]
|
||||
);
|
||||
|
||||
let stage_1_len = executor.stages[1].system_dependencies.len();
|
||||
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||
thread_local_deps.insert(0);
|
||||
let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||
write_f64_deps.insert(1);
|
||||
assert_eq!(
|
||||
executor.stages[1].system_dependencies,
|
||||
vec![
|
||||
FixedBitSet::with_capacity(stage_1_len),
|
||||
thread_local_deps,
|
||||
write_f64_deps
|
||||
]
|
||||
);
|
||||
|
||||
executor.run(schedule, world, resources);
|
||||
|
||||
let counter = resources.get::<Counter>().unwrap();
|
||||
assert_eq!(
|
||||
*counter.count.lock().unwrap(),
|
||||
7,
|
||||
"counter should have been incremented once for each system"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use crate::{resources::FromResources, system::SystemId, Archetype, Component, Resources};
|
||||
use crate::{resources::FromResources, system::{TypeAccess, SystemId}, Archetype, Component, Resources};
|
||||
use core::{
|
||||
any::TypeId,
|
||||
ops::{Deref, DerefMut},
|
||||
@ -148,6 +148,7 @@ pub trait FetchResource<'a>: Sized {
|
||||
/// Type of value to be fetched
|
||||
type Item: UnsafeClone;
|
||||
|
||||
fn access() -> TypeAccess;
|
||||
fn borrow(resource_archetypes: &HashMap<TypeId, Archetype>);
|
||||
fn release(resource_archetypes: &HashMap<TypeId, Archetype>);
|
||||
|
||||
@ -180,6 +181,11 @@ impl<'a, T: Component> FetchResource<'a> for FetchResourceRead<T> {
|
||||
archetype.release::<T>();
|
||||
}
|
||||
}
|
||||
fn access() -> TypeAccess {
|
||||
let mut access = TypeAccess::default();
|
||||
access.immutable.insert(TypeId::of::<T>());
|
||||
access
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Component> ResourceQuery for ResMut<'a, T> {
|
||||
@ -204,6 +210,11 @@ impl<'a, T: Component> FetchResource<'a> for FetchResourceWrite<T> {
|
||||
archetype.release_mut::<T>();
|
||||
}
|
||||
}
|
||||
fn access() -> TypeAccess {
|
||||
let mut access = TypeAccess::default();
|
||||
access.mutable.insert(TypeId::of::<T>());
|
||||
access
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Component + FromResources> ResourceQuery for Local<'a, T> {
|
||||
@ -248,6 +259,11 @@ impl<'a, T: Component + FromResources> FetchResource<'a> for FetchResourceLocalM
|
||||
archetype.release_mut::<T>();
|
||||
}
|
||||
}
|
||||
fn access() -> TypeAccess {
|
||||
let mut access = TypeAccess::default();
|
||||
access.mutable.insert(TypeId::of::<T>());
|
||||
access
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! tuple_impl {
|
||||
@ -269,6 +285,13 @@ macro_rules! tuple_impl {
|
||||
unsafe fn get(resources: &'a Resources, system_id: Option<SystemId>) -> Self::Item {
|
||||
($($name::get(resources, system_id),)*)
|
||||
}
|
||||
|
||||
#[allow(unused_mut)]
|
||||
fn access() -> TypeAccess {
|
||||
let mut access = TypeAccess::default();
|
||||
$(access.union(&$name::access());)*
|
||||
access
|
||||
}
|
||||
}
|
||||
|
||||
impl<$($name: ResourceQuery),*> ResourceQuery for ($($name,)*) {
|
||||
|
||||
@ -12,7 +12,8 @@ pub struct Schedule {
|
||||
pub(crate) stages: HashMap<Cow<'static, str>, Vec<Arc<Mutex<Box<dyn System>>>>>,
|
||||
pub(crate) stage_order: Vec<Cow<'static, str>>,
|
||||
pub(crate) system_ids: HashSet<SystemId>,
|
||||
is_dirty: bool,
|
||||
generation: usize,
|
||||
last_initialize_generation: usize,
|
||||
}
|
||||
|
||||
impl Schedule {
|
||||
@ -92,7 +93,7 @@ impl Schedule {
|
||||
self.system_ids.insert(system.id());
|
||||
systems.push(Arc::new(Mutex::new(system)));
|
||||
|
||||
self.is_dirty = true;
|
||||
self.generation += 1;
|
||||
self
|
||||
}
|
||||
|
||||
@ -131,8 +132,9 @@ impl Schedule {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move this code to ParallelExecutor
|
||||
pub fn initialize(&mut self, resources: &mut Resources) {
|
||||
if !self.is_dirty {
|
||||
if self.last_initialize_generation == self.generation {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -143,6 +145,10 @@ impl Schedule {
|
||||
}
|
||||
}
|
||||
|
||||
self.is_dirty = false;
|
||||
self.last_initialize_generation = self.generation;
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> usize {
|
||||
self.generation
|
||||
}
|
||||
}
|
||||
@ -1,7 +1,7 @@
|
||||
use crate::{Resources, World};
|
||||
use fixedbitset::FixedBitSet;
|
||||
use hecs::{Access, Query};
|
||||
use std::borrow::Cow;
|
||||
use std::{any::TypeId, borrow::Cow, collections::HashSet};
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum ThreadLocalExecution {
|
||||
@ -22,7 +22,8 @@ pub trait System: Send + Sync {
|
||||
fn name(&self) -> Cow<'static, str>;
|
||||
fn id(&self) -> SystemId;
|
||||
fn update_archetype_access(&mut self, world: &World);
|
||||
fn get_archetype_access(&self) -> &ArchetypeAccess;
|
||||
fn archetype_access(&self) -> &ArchetypeAccess;
|
||||
fn resource_access(&self) -> &TypeAccess;
|
||||
fn thread_local_execution(&self) -> ThreadLocalExecution;
|
||||
fn run(&mut self, world: &World, resources: &Resources);
|
||||
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources);
|
||||
@ -72,10 +73,36 @@ impl ArchetypeAccess {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Eq, PartialEq, Clone)]
|
||||
pub struct TypeAccess {
|
||||
pub immutable: HashSet<TypeId>,
|
||||
pub mutable: HashSet<TypeId>,
|
||||
}
|
||||
|
||||
impl TypeAccess {
|
||||
pub fn is_compatible(&self, other: &TypeAccess) -> bool {
|
||||
self.mutable.is_disjoint(&other.mutable)
|
||||
&& self.mutable.is_disjoint(&other.immutable)
|
||||
&& self.immutable.is_disjoint(&other.mutable)
|
||||
}
|
||||
|
||||
pub fn union(&mut self, other: &TypeAccess) {
|
||||
self.mutable.extend(&other.mutable);
|
||||
self.immutable.extend(&other.immutable);
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.immutable.clear();
|
||||
self.mutable.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ArchetypeAccess;
|
||||
use super::{TypeAccess, ArchetypeAccess};
|
||||
use hecs::World;
|
||||
use crate::{ResourceQuery, FetchResource, Res, ResMut};
|
||||
use std::any::TypeId;
|
||||
|
||||
struct A;
|
||||
struct B;
|
||||
@ -106,4 +133,14 @@ mod tests {
|
||||
assert!(access.immutable.contains(e2_archetype));
|
||||
assert!(access.immutable.contains(e3_archetype));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resource_query_access() {
|
||||
let access = <<(Res<A>, ResMut<B>, Res<C>) as ResourceQuery>::Fetch as FetchResource>::access();
|
||||
let mut expected_access = TypeAccess::default();
|
||||
expected_access.immutable.insert(TypeId::of::<A>());
|
||||
expected_access.immutable.insert(TypeId::of::<C>());
|
||||
expected_access.mutable.insert(TypeId::of::<B>());
|
||||
assert_eq!(access, expected_access);
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ serde = { version = "1", features = ["derive"] }
|
||||
bitflags = "1.0"
|
||||
smallvec = "1.4.0"
|
||||
# TODO: replace once_cell with std equivalent if/when this lands: https://github.com/rust-lang/rfcs/pull/2788
|
||||
once_cell = "1.3.1"
|
||||
once_cell = "1.4.0"
|
||||
downcast-rs = "1.1.1"
|
||||
thiserror = "1.0"
|
||||
anyhow = "1.0"
|
||||
@ -13,7 +13,7 @@ use crate::{
|
||||
use bevy_asset::{Assets, Handle};
|
||||
use bevy_ecs::{
|
||||
resource_query::UnsafeClone, Archetype, FetchResource, Query, Res, ResMut, ResourceQuery,
|
||||
Resources, SystemId,
|
||||
Resources, SystemId, TypeAccess,
|
||||
};
|
||||
use bevy_property::Properties;
|
||||
use std::{any::TypeId, collections::HashMap, ops::Range, sync::Arc};
|
||||
@ -148,6 +148,7 @@ impl<'a> ResourceQuery for DrawContext<'a> {
|
||||
|
||||
pub struct FetchDrawContext;
|
||||
|
||||
// TODO: derive this impl
|
||||
impl<'a> FetchResource<'a> for FetchDrawContext {
|
||||
type Item = DrawContext<'a>;
|
||||
fn borrow(resource_archetypes: &HashMap<TypeId, Archetype>) {
|
||||
@ -213,6 +214,23 @@ impl<'a> FetchResource<'a> for FetchDrawContext {
|
||||
current_pipeline: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn access() -> TypeAccess {
|
||||
let mut access = TypeAccess::default();
|
||||
access
|
||||
.mutable
|
||||
.insert(TypeId::of::<Assets<PipelineDescriptor>>());
|
||||
access.mutable.insert(TypeId::of::<Assets<Shader>>());
|
||||
access.mutable.insert(TypeId::of::<PipelineCompiler>());
|
||||
access
|
||||
.immutable
|
||||
.insert(TypeId::of::<Box<dyn RenderResourceContext>>());
|
||||
access
|
||||
.immutable
|
||||
.insert(TypeId::of::<VertexBufferDescriptors>());
|
||||
access.immutable.insert(TypeId::of::<SharedBuffers>());
|
||||
access
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DrawContext<'a> {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user