ecs: only prepare executor on changes. use parallel executor in App

This commit is contained in:
Carter Anderson 2020-07-15 17:59:13 -07:00
parent 4712e96aa8
commit 362fb92cf8
3 changed files with 64 additions and 75 deletions

View File

@ -1,5 +1,5 @@
use super::AppBuilder; use super::AppBuilder;
use bevy_ecs::{Resources, Schedule, World}; use bevy_ecs::{Resources, Schedule, World, ParallelExecutor};
#[derive(Default)] #[derive(Default)]
pub struct App { pub struct App {
@ -7,7 +7,9 @@ pub struct App {
pub resources: Resources, pub resources: Resources,
pub runner: Option<Box<dyn Fn(App)>>, pub runner: Option<Box<dyn Fn(App)>>,
pub schedule: Schedule, pub schedule: Schedule,
pub executor: ParallelExecutor,
pub startup_schedule: Schedule, pub startup_schedule: Schedule,
pub startup_executor: ParallelExecutor,
} }
impl App { impl App {
@ -17,13 +19,12 @@ impl App {
pub fn update(&mut self) { pub fn update(&mut self) {
self.schedule.initialize(&mut self.resources); self.schedule.initialize(&mut self.resources);
self.schedule.run(&mut self.world, &mut self.resources); self.executor.run(&mut self.schedule, &mut self.world, &mut self.resources);
} }
pub fn run(mut self) { pub fn run(mut self) {
self.startup_schedule.initialize(&mut self.resources); self.startup_schedule.initialize(&mut self.resources);
self.startup_schedule self.startup_executor.run(&mut self.startup_schedule, &mut self.world, &mut self.resources);
.run(&mut self.world, &mut self.resources);
if let Some(run) = self.runner.take() { if let Some(run) = self.runner.take() {
run(self) run(self)
} }

View File

@ -700,7 +700,7 @@ impl<A: DynamicBundle> core::iter::FromIterator<A> for World {
/// Determines freshness of information derived from `World::archetypes` /// Determines freshness of information derived from `World::archetypes`
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct ArchetypesGeneration(u64); pub struct ArchetypesGeneration(pub u64);
/// Entity IDs created by `World::spawn_batch` /// Entity IDs created by `World::spawn_batch`
pub struct SpawnBatchIter<'a, I> pub struct SpawnBatchIter<'a, I>

View File

@ -4,27 +4,42 @@ use crate::{
}; };
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet; use fixedbitset::FixedBitSet;
use hecs::World; use hecs::{ArchetypesGeneration, World};
use rayon::ScopeFifo; use rayon::ScopeFifo;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
#[derive(Default)]
pub struct ParallelExecutor { pub struct ParallelExecutor {
stages: Vec<ExecutorStage>, stages: Vec<ExecutorStage>,
last_schedule_generation: usize,
}
impl Default for ParallelExecutor {
fn default() -> Self {
Self {
stages: Default::default(),
last_schedule_generation: usize::MAX, // MAX forces prepare to run the first time
}
}
} }
impl ParallelExecutor { impl ParallelExecutor {
pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) { pub fn prepare(&mut self, schedule: &mut Schedule, world: &World) {
let mut executor_stages = vec![ExecutorStage::default(); schedule.stage_order.len()];
let schedule_generation = schedule.generation(); let schedule_generation = schedule.generation();
let schedule_changed = schedule_generation != self.last_schedule_generation;
if schedule_changed {
self.stages.clear();
self.stages.resize_with(schedule.stage_order.len(), || ExecutorStage::default());
}
for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() { for (stage_index, stage_name) in schedule.stage_order.iter().enumerate() {
let executor_stage = &mut executor_stages[stage_index]; let executor_stage = &mut self.stages[stage_index];
if let Some(systems) = schedule.stages.get(stage_name) { if let Some(systems) = schedule.stages.get(stage_name) {
executor_stage.prepare(world, systems, schedule_generation); executor_stage.prepare(world, systems, schedule_changed);
} }
} }
self.stages = executor_stages; self.last_schedule_generation = schedule_generation;
} }
pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) { pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
@ -50,7 +65,7 @@ pub struct ExecutorStage {
sender: Sender<usize>, sender: Sender<usize>,
receiver: Receiver<usize>, receiver: Receiver<usize>,
last_prepare_schedule_generation: usize, last_archetypes_generation: ArchetypesGeneration,
} }
impl Default for ExecutorStage { impl Default for ExecutorStage {
@ -63,7 +78,7 @@ impl Default for ExecutorStage {
running_systems: Default::default(), running_systems: Default::default(),
sender, sender,
receiver, receiver,
last_prepare_schedule_generation: usize::MAX, // MAX forces prepare to run the first time last_archetypes_generation: ArchetypesGeneration(u64::MAX), // MAX forces prepare to run the first time
} }
} }
} }
@ -84,12 +99,8 @@ impl ExecutorStage {
&mut self, &mut self,
world: &World, world: &World,
systems: &Vec<Arc<Mutex<Box<dyn System>>>>, systems: &Vec<Arc<Mutex<Box<dyn System>>>>,
schedule_generation: usize, schedule_changed: bool,
// last_world_generation
// last_world_generation_start_index <- for cases where we do a partial update midway through execution (then need to update everything that came before in the next execution)
) { ) {
let schedule_changed = self.last_prepare_schedule_generation != schedule_generation;
// if the schedule has changed, clear executor state / fill it with new defaults // if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed { if schedule_changed {
self.system_dependencies.clear(); self.system_dependencies.clear();
@ -103,18 +114,17 @@ impl ExecutorStage {
self.running_systems.grow(systems.len()); self.running_systems.grow(systems.len());
} }
// TODO: check archetype generation here let archetypes_generation = world.archetypes_generation();
let world_generation_changed = true; let archetypes_generation_changed =
self.last_archetypes_generation != archetypes_generation;
if world_generation_changed { if schedule_changed || archetypes_generation_changed {
// update each system's archetype access to latest world archetypes // update each system's archetype access to latest world archetypes
for system in systems.iter() { for system in systems.iter() {
let mut system = system.lock().unwrap(); let mut system = system.lock().unwrap();
system.update_archetype_access(world); system.update_archetype_access(world);
} }
}
if schedule_changed || world_generation_changed {
// calculate dependencies between systems and build execution order // calculate dependencies between systems and build execution order
let mut current_archetype_access = ArchetypeAccess::default(); let mut current_archetype_access = ArchetypeAccess::default();
let mut current_resource_access = TypeAccess::default(); let mut current_resource_access = TypeAccess::default();
@ -176,7 +186,7 @@ impl ExecutorStage {
} }
} }
self.last_prepare_schedule_generation = schedule_generation; self.last_archetypes_generation = archetypes_generation;
} }
fn run_ready_systems<'run>( fn run_ready_systems<'run>(
@ -316,7 +326,9 @@ impl ExecutorStage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::ParallelExecutor; use super::ParallelExecutor;
use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World, ResMut}; use crate::{
IntoQuerySystem, IntoThreadLocalSystem, Query, Res, ResMut, Resources, Schedule, World,
};
use fixedbitset::FixedBitSet; use fixedbitset::FixedBitSet;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -346,37 +358,25 @@ mod tests {
fn read_u32(counter: Res<Counter>, _query: Query<&u32>) { fn read_u32(counter: Res<Counter>, _query: Query<&u32>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert!( assert!(*count < 2, "should be one of the first two systems to run");
*count < 2,
"should be one of the first two systems to run"
);
*count += 1; *count += 1;
} }
fn write_float(counter: Res<Counter>, _query: Query<&f32>) { fn write_float(counter: Res<Counter>, _query: Query<&f32>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert!( assert!(*count < 2, "should be one of the first two systems to run");
*count < 2,
"should be one of the first two systems to run"
);
*count += 1; *count += 1;
} }
fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) { fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 2, "should always be the 3rd system to run");
*count, 2,
"should always be the 3rd system to run"
);
*count += 1; *count += 1;
} }
fn read_u64(counter: Res<Counter>, _query: Query<&u64>) { fn read_u64(counter: Res<Counter>, _query: Query<&u64>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 3, "should always be the 4th system to run");
*count, 3,
"should always be the 4th system to run"
);
*count += 1; *count += 1;
} }
@ -384,34 +384,25 @@ mod tests {
schedule.add_system_to_stage("A", write_float.system()); schedule.add_system_to_stage("A", write_float.system());
schedule.add_system_to_stage("A", read_u32_write_u64.system()); schedule.add_system_to_stage("A", read_u32_write_u64.system());
schedule.add_system_to_stage("A", read_u64.system()); schedule.add_system_to_stage("A", read_u64.system());
// B systems // B systems
fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) { fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 4, "should always be the 5th system to run");
*count, 4,
"should always be the 5th system to run"
);
*count += 1; *count += 1;
} }
fn thread_local_system(_world: &mut World, resources: &mut Resources) { fn thread_local_system(_world: &mut World, resources: &mut Resources) {
let counter = resources.get::<Counter>().unwrap(); let counter = resources.get::<Counter>().unwrap();
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 5, "should always be the 6th system to run");
*count, 5,
"should always be the 6th system to run"
);
*count += 1; *count += 1;
} }
fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) { fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 6, "should always be the 7th system to run");
*count, 6,
"should always be the 7th system to run"
);
*count += 1; *count += 1;
} }
@ -419,7 +410,6 @@ mod tests {
schedule.add_system_to_stage("B", thread_local_system.thread_local_system()); schedule.add_system_to_stage("B", thread_local_system.thread_local_system());
schedule.add_system_to_stage("B", write_f32.system()); schedule.add_system_to_stage("B", write_f32.system());
// C systems // C systems
fn read_f64_res(counter: Res<Counter>, _f64_res: Res<f64>) { fn read_f64_res(counter: Res<Counter>, _f64_res: Res<f64>) {
@ -440,21 +430,19 @@ mod tests {
*count += 1; *count += 1;
} }
fn read_isize_write_f64_res(counter: Res<Counter>, _isize_res: Res<isize>, _f64_res: ResMut<f64>) { fn read_isize_write_f64_res(
counter: Res<Counter>,
_isize_res: Res<isize>,
_f64_res: ResMut<f64>,
) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 9, "should always be the 10th system to run");
*count, 9,
"should always be the 10th system to run"
);
*count += 1; *count += 1;
} }
fn write_f64_res(counter: Res<Counter>, _f64_res: ResMut<f64>) { fn write_f64_res(counter: Res<Counter>, _f64_res: ResMut<f64>) {
let mut count = counter.count.lock().unwrap(); let mut count = counter.count.lock().unwrap();
assert_eq!( assert_eq!(*count, 10, "should always be the 11th system to run");
*count, 10,
"should always be the 11th system to run"
);
*count += 1; *count += 1;
} }
@ -470,7 +458,7 @@ mod tests {
resources: &mut Resources, resources: &mut Resources,
) { ) {
executor.prepare(schedule, world); executor.prepare(schedule, world);
assert_eq!( assert_eq!(
executor.stages[0].system_dependents, executor.stages[0].system_dependents,
vec![vec![2], vec![], vec![3], vec![]] vec![vec![2], vec![], vec![3], vec![]]
@ -483,13 +471,13 @@ mod tests {
executor.stages[2].system_dependents, executor.stages[2].system_dependents,
vec![vec![2], vec![2], vec![3], vec![]] vec![vec![2], vec![2], vec![3], vec![]]
); );
let stage_0_len = executor.stages[0].system_dependencies.len(); let stage_0_len = executor.stages[0].system_dependencies.len();
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len); let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u32_write_u64_deps.insert(0); read_u32_write_u64_deps.insert(0);
let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len); let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u64_deps.insert(2); read_u64_deps.insert(2);
assert_eq!( assert_eq!(
executor.stages[0].system_dependencies, executor.stages[0].system_dependencies,
vec![ vec![
@ -499,7 +487,7 @@ mod tests {
read_u64_deps, read_u64_deps,
] ]
); );
let stage_1_len = executor.stages[1].system_dependencies.len(); let stage_1_len = executor.stages[1].system_dependencies.len();
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len); let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
thread_local_deps.insert(0); thread_local_deps.insert(0);
@ -528,9 +516,9 @@ mod tests {
write_f64_res_deps write_f64_res_deps
] ]
); );
executor.run(schedule, world, resources); executor.run(schedule, world, resources);
let counter = resources.get::<Counter>().unwrap(); let counter = resources.get::<Counter>().unwrap();
assert_eq!( assert_eq!(
*counter.count.lock().unwrap(), *counter.count.lock().unwrap(),