ecs: add thread local system support to parallel executor
This commit is contained in:
parent
98ed29aacc
commit
0dc810a37a
@ -20,10 +20,10 @@ impl AppPlugin for DiagnosticsPlugin {
|
|||||||
#[cfg(feature = "profiler")]
|
#[cfg(feature = "profiler")]
|
||||||
{
|
{
|
||||||
use bevy_ecs::IntoQuerySystem;
|
use bevy_ecs::IntoQuerySystem;
|
||||||
app.add_resource_ecs::<Box<dyn bevy_ecs::profiler::Profiler>>(Box::new(
|
app.add_resource::<Box<dyn bevy_ecs::profiler::Profiler>>(Box::new(
|
||||||
system_profiler::SystemProfiler::default(),
|
system_profiler::SystemProfiler::default(),
|
||||||
))
|
))
|
||||||
.add_system_to_stage_ecs(
|
.add_system_to_stage(
|
||||||
bevy_app::stage::LAST,
|
bevy_app::stage::LAST,
|
||||||
system_profiler::profiler_diagnostic_system.system(),
|
system_profiler::profiler_diagnostic_system.system(),
|
||||||
);
|
);
|
||||||
|
@ -296,7 +296,7 @@ impl Commands {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply(self, world: &mut World, resources: &mut Resources) {
|
pub fn apply(&self, world: &mut World, resources: &mut Resources) {
|
||||||
let mut commands = self.commands.lock().unwrap();
|
let mut commands = self.commands.lock().unwrap();
|
||||||
for command in commands.commands.drain(..) {
|
for command in commands.commands.drain(..) {
|
||||||
match command {
|
match command {
|
||||||
|
@ -65,6 +65,16 @@ impl Default for ExecutorStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum RunReadyResult {
|
||||||
|
Ok,
|
||||||
|
ThreadLocalReady(usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
enum RunReadyType {
|
||||||
|
All,
|
||||||
|
Dependents(usize),
|
||||||
|
}
|
||||||
|
|
||||||
impl ExecutorStage {
|
impl ExecutorStage {
|
||||||
pub fn prepare(&mut self, world: &World, systems: &Vec<Arc<Mutex<Box<dyn System>>>>) {
|
pub fn prepare(&mut self, world: &World, systems: &Vec<Arc<Mutex<Box<dyn System>>>>) {
|
||||||
self.system_dependencies = vec![FixedBitSet::with_capacity(systems.len()); systems.len()];
|
self.system_dependencies = vec![FixedBitSet::with_capacity(systems.len()); systems.len()];
|
||||||
@ -79,17 +89,25 @@ impl ExecutorStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut current_archetype_access = ArchetypeAccess::default();
|
let mut current_archetype_access = ArchetypeAccess::default();
|
||||||
|
let mut last_thread_local_index: Option<usize> = None;
|
||||||
for (system_index, system) in systems.iter().enumerate() {
|
for (system_index, system) in systems.iter().enumerate() {
|
||||||
let system = system.lock().unwrap();
|
let system = system.lock().unwrap();
|
||||||
if let Some(archetype_access) = system.get_archetype_access() {
|
let archetype_access = system.get_archetype_access();
|
||||||
// TODO: check if thread local and add full sync
|
match system.thread_local_execution() {
|
||||||
// if any system before this one conflicts, check all systems that came before for compatibility
|
ThreadLocalExecution::NextFlush => {
|
||||||
if current_archetype_access.is_compatible(archetype_access) == false {
|
// if any system before this one conflicts, check all systems that came before for compatibility
|
||||||
for earlier_system_index in 0..system_index {
|
if current_archetype_access.is_compatible(archetype_access) == false {
|
||||||
let earlier_system = systems[earlier_system_index].lock().unwrap();
|
for earlier_system_index in 0..system_index {
|
||||||
if let Some(earlier_archetype_access) =
|
let earlier_system = systems[earlier_system_index].lock().unwrap();
|
||||||
earlier_system.get_archetype_access()
|
|
||||||
{
|
// ignore "immediate" thread local systems, we handle them separately
|
||||||
|
if let ThreadLocalExecution::Immediate =
|
||||||
|
earlier_system.thread_local_execution()
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let earlier_archetype_access = earlier_system.get_archetype_access();
|
||||||
// if earlier system is incompatible, make the current system dependent
|
// if earlier system is incompatible, make the current system dependent
|
||||||
if earlier_archetype_access.is_compatible(archetype_access) == false {
|
if earlier_archetype_access.is_compatible(archetype_access) == false {
|
||||||
self.system_dependents[earlier_system_index].push(system_index);
|
self.system_dependents[earlier_system_index].push(system_index);
|
||||||
@ -97,62 +115,87 @@ impl ExecutorStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
current_archetype_access.union(archetype_access);
|
current_archetype_access.union(archetype_access);
|
||||||
|
|
||||||
|
if let Some(last_thread_local_index) = last_thread_local_index {
|
||||||
|
self.system_dependents[last_thread_local_index].push(system_index);
|
||||||
|
self.system_dependencies[system_index].insert(last_thread_local_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ThreadLocalExecution::Immediate => {
|
||||||
|
last_thread_local_index = Some(system_index);
|
||||||
|
for earlier_system_index in 0..system_index {
|
||||||
|
// treat all earlier systems as "incompatible" to ensure we run this thread local system exclusively
|
||||||
|
self.system_dependents[earlier_system_index].push(system_index);
|
||||||
|
self.system_dependencies[system_index].insert(earlier_system_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_ready_systems<'run>(
|
fn run_ready_systems<'run>(
|
||||||
&mut self,
|
&mut self,
|
||||||
systems: &[Arc<Mutex<Box<dyn System>>>],
|
systems: &[Arc<Mutex<Box<dyn System>>>],
|
||||||
|
run_ready_type: RunReadyType,
|
||||||
scope: &ScopeFifo<'run>,
|
scope: &ScopeFifo<'run>,
|
||||||
world: &'run World,
|
world: &'run World,
|
||||||
resources: &'run Resources,
|
resources: &'run Resources,
|
||||||
) {
|
) -> RunReadyResult {
|
||||||
for i in 0..systems.len() {
|
// produce a system index iterator based on the passed in RunReadyType
|
||||||
Self::try_run_system(
|
let mut all;
|
||||||
systems,
|
let mut dependents;
|
||||||
i,
|
let system_index_iter: &mut dyn Iterator<Item = usize> = match run_ready_type {
|
||||||
&mut self.running_systems,
|
RunReadyType::All => {
|
||||||
&self.finished_systems,
|
all = 0..systems.len();
|
||||||
&self.system_dependencies,
|
&mut all
|
||||||
&self.sender,
|
}
|
||||||
scope,
|
RunReadyType::Dependents(system_index) => {
|
||||||
world,
|
dependents = self.system_dependents[system_index].iter().cloned();
|
||||||
resources,
|
&mut dependents
|
||||||
);
|
}
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
let mut systems_currently_running = false;
|
||||||
pub fn try_run_system<'run>(
|
for system_index in system_index_iter {
|
||||||
systems: &[Arc<Mutex<Box<dyn System>>>],
|
// if this system has already been run, don't try to run it again
|
||||||
system_index: usize,
|
if self.running_systems.contains(system_index) {
|
||||||
running_systems: &mut FixedBitSet,
|
continue;
|
||||||
finished_systems: &FixedBitSet,
|
}
|
||||||
system_dependencies: &[FixedBitSet],
|
|
||||||
sender: &Sender<usize>,
|
// if all system dependencies are finished, queue up the system to run
|
||||||
scope: &ScopeFifo<'run>,
|
if self.system_dependencies[system_index].is_subset(&self.finished_systems) {
|
||||||
world: &'run World,
|
let system = systems[system_index].clone();
|
||||||
resources: &'run Resources,
|
|
||||||
) {
|
// handle thread local system
|
||||||
if running_systems.contains(system_index) {
|
{
|
||||||
return;
|
let system = system.lock().unwrap();
|
||||||
|
if let ThreadLocalExecution::Immediate = system.thread_local_execution() {
|
||||||
|
if systems_currently_running {
|
||||||
|
// if systems are currently running, we can't run this thread local system yet
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// if no systems are running, return this thread local system to be run exclusively
|
||||||
|
return RunReadyResult::ThreadLocalReady(system_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle multi-threaded system
|
||||||
|
let sender = self.sender.clone();
|
||||||
|
self.running_systems.insert(system_index);
|
||||||
|
scope.spawn_fifo(move |_| {
|
||||||
|
let mut system = system.lock().unwrap();
|
||||||
|
system.run(world, resources);
|
||||||
|
sender.send(system_index).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
systems_currently_running = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if all system dependencies are finished, queue up the system to run
|
RunReadyResult::Ok
|
||||||
if system_dependencies[system_index].is_subset(&finished_systems) {
|
|
||||||
let system = systems[system_index].clone();
|
|
||||||
let sender = sender.clone();
|
|
||||||
running_systems.insert(system_index);
|
|
||||||
scope.spawn_fifo(move |_| {
|
|
||||||
let mut system = system.lock().unwrap();
|
|
||||||
system.run(world, resources);
|
|
||||||
sender.send(system_index).unwrap();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(
|
pub fn run(
|
||||||
@ -163,46 +206,60 @@ impl ExecutorStage {
|
|||||||
) {
|
) {
|
||||||
self.finished_systems.clear();
|
self.finished_systems.clear();
|
||||||
self.running_systems.clear();
|
self.running_systems.clear();
|
||||||
{
|
let mut run_ready_result = RunReadyResult::Ok;
|
||||||
let world = &*world;
|
rayon::scope_fifo(|scope| {
|
||||||
let resources = &*resources;
|
run_ready_result =
|
||||||
|
self.run_ready_systems(systems, RunReadyType::All, scope, world, resources);
|
||||||
|
});
|
||||||
|
loop {
|
||||||
|
// if all systems in the stage are finished, break out of the loop
|
||||||
|
if self.finished_systems.count_ones(..) == systems.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
rayon::scope_fifo(move |scope| {
|
if let RunReadyResult::ThreadLocalReady(thread_local_index) = run_ready_result {
|
||||||
self.run_ready_systems(systems, scope, world, resources);
|
// if a thread local system is ready to run, run it exclusively on the main thread
|
||||||
loop {
|
let mut system = systems[thread_local_index].lock().unwrap();
|
||||||
if self.finished_systems.count_ones(..) == systems.len() {
|
self.running_systems.insert(thread_local_index);
|
||||||
break;
|
system.run(world, resources);
|
||||||
}
|
system.run_thread_local(world, resources);
|
||||||
|
self.finished_systems.insert(thread_local_index);
|
||||||
|
self.sender.send(thread_local_index).unwrap();
|
||||||
|
run_ready_result = RunReadyResult::Ok;
|
||||||
|
} else {
|
||||||
|
// wait for a system to finish, then run its dependents
|
||||||
|
rayon::scope_fifo(|scope| {
|
||||||
|
loop {
|
||||||
|
// if all systems in the stage are finished, break out of the loop
|
||||||
|
if self.finished_systems.count_ones(..) == systems.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
let finished_system = self.receiver.recv().unwrap();
|
let finished_system = self.receiver.recv().unwrap();
|
||||||
self.finished_systems.insert(finished_system);
|
self.finished_systems.insert(finished_system);
|
||||||
for dependent_system in self.system_dependents[finished_system].iter() {
|
run_ready_result = self.run_ready_systems(
|
||||||
Self::try_run_system(
|
|
||||||
systems,
|
systems,
|
||||||
*dependent_system,
|
RunReadyType::Dependents(finished_system),
|
||||||
&mut self.running_systems,
|
|
||||||
&self.finished_systems,
|
|
||||||
&self.system_dependencies,
|
|
||||||
&self.sender,
|
|
||||||
scope,
|
scope,
|
||||||
world,
|
world,
|
||||||
resources,
|
resources,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// if the next ready system is thread local, break out of this loop/rayon scope so it can be run
|
||||||
|
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// "flush"
|
// "flush"
|
||||||
// NOTE: when this is made parallel a full sync is required here
|
|
||||||
for system in systems.iter() {
|
for system in systems.iter() {
|
||||||
let mut system = system.lock().unwrap();
|
let mut system = system.lock().unwrap();
|
||||||
match system.thread_local_execution() {
|
match system.thread_local_execution() {
|
||||||
ThreadLocalExecution::NextFlush => system.run_thread_local(world, resources),
|
ThreadLocalExecution::NextFlush => system.run_thread_local(world, resources),
|
||||||
ThreadLocalExecution::Immediate => {
|
ThreadLocalExecution::Immediate => { /* already ran */ }
|
||||||
// TODO: this should happen immediately after thread local systems
|
|
||||||
system.run_thread_local(world, resources)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -227,7 +284,7 @@ impl ArchetypeAccess {
|
|||||||
self.immutable.union_with(&other.immutable);
|
self.immutable.union_with(&other.immutable);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_bits_for_query<Q>(&mut self, world: &World)
|
pub fn set_access_for_query<Q>(&mut self, world: &World)
|
||||||
where
|
where
|
||||||
Q: Query,
|
Q: Query,
|
||||||
{
|
{
|
||||||
@ -251,7 +308,7 @@ impl ArchetypeAccess {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::Executor;
|
use super::Executor;
|
||||||
use crate::{IntoQuerySystem, Query, Res, Resources, Schedule, World};
|
use crate::{IntoQuerySystem, IntoThreadLocalSystem, Query, Res, Resources, Schedule, World};
|
||||||
use fixedbitset::FixedBitSet;
|
use fixedbitset::FixedBitSet;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
@ -264,7 +321,7 @@ mod tests {
|
|||||||
let mut count = counter.count.lock().unwrap();
|
let mut count = counter.count.lock().unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
*count < 2,
|
*count < 2,
|
||||||
"read_32 should run be one of the first two systems to run"
|
"read_32 should be one of the first two systems to run"
|
||||||
);
|
);
|
||||||
*count += 1;
|
*count += 1;
|
||||||
}
|
}
|
||||||
@ -273,7 +330,7 @@ mod tests {
|
|||||||
let mut count = counter.count.lock().unwrap();
|
let mut count = counter.count.lock().unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
*count < 2,
|
*count < 2,
|
||||||
"write_float should run be one of the first two systems to run"
|
"write_float should be one of the first two systems to run"
|
||||||
);
|
);
|
||||||
*count += 1;
|
*count += 1;
|
||||||
}
|
}
|
||||||
@ -305,6 +362,25 @@ mod tests {
|
|||||||
*count += 1;
|
*count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn thread_local_system(_world: &mut World, resources: &mut Resources) {
|
||||||
|
let counter = resources.get::<Counter>().unwrap();
|
||||||
|
let mut count = counter.count.lock().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
*count, 5,
|
||||||
|
"thread_local_system should always be the sixth system to run"
|
||||||
|
);
|
||||||
|
*count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
|
||||||
|
let mut count = counter.count.lock().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
*count, 6,
|
||||||
|
"write_f32 should always be the seventh system to run"
|
||||||
|
);
|
||||||
|
*count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn schedule() {
|
fn schedule() {
|
||||||
let mut world = World::new();
|
let mut world = World::new();
|
||||||
@ -324,15 +400,27 @@ mod tests {
|
|||||||
schedule.add_system_to_stage("A", read_u32_write_u64.system());
|
schedule.add_system_to_stage("A", read_u32_write_u64.system());
|
||||||
schedule.add_system_to_stage("A", read_u64.system());
|
schedule.add_system_to_stage("A", read_u64.system());
|
||||||
schedule.add_system_to_stage("B", write_u64.system());
|
schedule.add_system_to_stage("B", write_u64.system());
|
||||||
|
schedule.add_system_to_stage("B", thread_local_system.thread_local_system());
|
||||||
|
schedule.add_system_to_stage("B", write_f32.system());
|
||||||
|
|
||||||
let mut executor = Executor::default();
|
let mut executor = Executor::default();
|
||||||
executor.prepare(&mut schedule, &world);
|
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
|
||||||
|
// run again (with counter reset) to ensure executor works correctly across runs
|
||||||
|
*resources.get::<Counter>().unwrap().count.lock().unwrap() = 0;
|
||||||
|
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_executor_and_validate(executor: &mut Executor, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
|
||||||
|
executor.prepare(schedule, world);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
executor.stages[0].system_dependents,
|
executor.stages[0].system_dependents,
|
||||||
vec![vec![2], vec![], vec![3], vec![]]
|
vec![vec![2], vec![], vec![3], vec![]]
|
||||||
);
|
);
|
||||||
assert_eq!(executor.stages[1].system_dependents, vec![vec![]]);
|
assert_eq!(
|
||||||
|
executor.stages[1].system_dependents,
|
||||||
|
vec![vec![1], vec![2], vec![]]
|
||||||
|
);
|
||||||
|
|
||||||
let stage_0_len = executor.stages[0].system_dependencies.len();
|
let stage_0_len = executor.stages[0].system_dependencies.len();
|
||||||
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
|
||||||
@ -350,19 +438,26 @@ mod tests {
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let stage_1_len = executor.stages[1].system_dependencies.len();
|
||||||
|
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||||
|
thread_local_deps.insert(0);
|
||||||
|
let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len);
|
||||||
|
write_f64_deps.insert(1);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
executor.stages[1].system_dependencies,
|
executor.stages[1].system_dependencies,
|
||||||
vec![
|
vec![
|
||||||
FixedBitSet::with_capacity(1),
|
FixedBitSet::with_capacity(stage_1_len),
|
||||||
|
thread_local_deps,
|
||||||
|
write_f64_deps
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
executor.run(&mut schedule, &mut world, &mut resources);
|
executor.run(schedule, world, resources);
|
||||||
|
|
||||||
let counter = resources.get::<Counter>().unwrap();
|
let counter = resources.get::<Counter>().unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
*counter.count.lock().unwrap(),
|
*counter.count.lock().unwrap(),
|
||||||
5,
|
7,
|
||||||
"counter should have been incremented once for each system"
|
"counter should have been incremented once for each system"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
|
executor::ArchetypeAccess,
|
||||||
resource_query::{FetchResource, ResourceQuery, UnsafeClone},
|
resource_query::{FetchResource, ResourceQuery, UnsafeClone},
|
||||||
system::{System, SystemId, ThreadLocalExecution},
|
system::{System, SystemId, ThreadLocalExecution},
|
||||||
Commands, Resources, executor::ArchetypeAccess,
|
Commands, Resources,
|
||||||
};
|
};
|
||||||
use core::marker::PhantomData;
|
use core::marker::PhantomData;
|
||||||
use hecs::{
|
use hecs::{
|
||||||
@ -9,26 +10,27 @@ use hecs::{
|
|||||||
};
|
};
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
|
||||||
pub struct SystemFn<F, Init, SetArchetypeAccess>
|
pub struct SystemFn<F, ThreadLocalF, Init, SetArchetypeAccess>
|
||||||
where
|
where
|
||||||
F: FnMut(Commands, &World, &Resources) + Send + Sync,
|
F: FnMut(&World, &Resources) + Send + Sync,
|
||||||
|
ThreadLocalF: FnMut(&mut World, &mut Resources) + Send + Sync,
|
||||||
Init: FnMut(&mut Resources) + Send + Sync,
|
Init: FnMut(&mut Resources) + Send + Sync,
|
||||||
SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync,
|
SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync,
|
||||||
{
|
{
|
||||||
pub func: F,
|
pub func: F,
|
||||||
|
pub thread_local_func: ThreadLocalF,
|
||||||
pub init_func: Init,
|
pub init_func: Init,
|
||||||
pub commands: Commands,
|
|
||||||
pub thread_local_execution: ThreadLocalExecution,
|
pub thread_local_execution: ThreadLocalExecution,
|
||||||
pub name: Cow<'static, str>,
|
pub name: Cow<'static, str>,
|
||||||
pub id: SystemId,
|
pub id: SystemId,
|
||||||
pub archetype_access: ArchetypeAccess,
|
pub archetype_access: ArchetypeAccess,
|
||||||
pub set_archetype_access: SetArchetypeAccess,
|
pub set_archetype_access: SetArchetypeAccess,
|
||||||
// TODO: add dependency info here
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Init, SetArchetypeAccess> System for SystemFn<F, Init, SetArchetypeAccess>
|
impl<F, ThreadLocalF, Init, SetArchetypeAccess> System for SystemFn<F, ThreadLocalF, Init, SetArchetypeAccess>
|
||||||
where
|
where
|
||||||
F: FnMut(Commands, &World, &Resources) + Send + Sync,
|
F: FnMut(&World, &Resources) + Send + Sync,
|
||||||
|
ThreadLocalF: FnMut(&mut World, &mut Resources) + Send + Sync,
|
||||||
Init: FnMut(&mut Resources) + Send + Sync,
|
Init: FnMut(&mut Resources) + Send + Sync,
|
||||||
SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync,
|
SetArchetypeAccess: FnMut(&World, &mut ArchetypeAccess) + Send + Sync,
|
||||||
{
|
{
|
||||||
@ -40,8 +42,8 @@ where
|
|||||||
(self.set_archetype_access)(world, &mut self.archetype_access);
|
(self.set_archetype_access)(world, &mut self.archetype_access);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_archetype_access(&self) -> Option<&ArchetypeAccess> {
|
fn get_archetype_access(&self) -> &ArchetypeAccess {
|
||||||
Some(&self.archetype_access)
|
&self.archetype_access
|
||||||
}
|
}
|
||||||
|
|
||||||
fn thread_local_execution(&self) -> ThreadLocalExecution {
|
fn thread_local_execution(&self) -> ThreadLocalExecution {
|
||||||
@ -49,12 +51,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn run(&mut self, world: &World, resources: &Resources) {
|
fn run(&mut self, world: &World, resources: &Resources) {
|
||||||
(self.func)(self.commands.clone(), world, resources);
|
(self.func)(world, resources);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources) {
|
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources) {
|
||||||
let commands = core::mem::replace(&mut self.commands, Commands::default());
|
(self.thread_local_func)(world, resources);
|
||||||
commands.apply(world, resources);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn initialize(&mut self, resources: &mut Resources) {
|
fn initialize(&mut self, resources: &mut Resources) {
|
||||||
@ -89,12 +90,13 @@ macro_rules! impl_into_foreach_system {
|
|||||||
#[allow(unused_unsafe)]
|
#[allow(unused_unsafe)]
|
||||||
fn system(mut self) -> Box<dyn System> {
|
fn system(mut self) -> Box<dyn System> {
|
||||||
let id = SystemId::new();
|
let id = SystemId::new();
|
||||||
|
let commands = Commands::default();
|
||||||
|
let thread_local_commands = commands.clone();
|
||||||
Box::new(SystemFn {
|
Box::new(SystemFn {
|
||||||
commands: Commands::default(),
|
|
||||||
thread_local_execution: ThreadLocalExecution::NextFlush,
|
thread_local_execution: ThreadLocalExecution::NextFlush,
|
||||||
name: core::any::type_name::<Self>().into(),
|
name: core::any::type_name::<Self>().into(),
|
||||||
id,
|
id,
|
||||||
func: move |commands, world, resources| {
|
func: move |world, resources| {
|
||||||
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::borrow(&resources.resource_archetypes);
|
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::borrow(&resources.resource_archetypes);
|
||||||
{
|
{
|
||||||
let ($($resource,)*) = resources.query_system::<($($resource,)*)>(id);
|
let ($($resource,)*) = resources.query_system::<($($resource,)*)>(id);
|
||||||
@ -104,13 +106,16 @@ macro_rules! impl_into_foreach_system {
|
|||||||
}
|
}
|
||||||
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::release(&resources.resource_archetypes);
|
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::release(&resources.resource_archetypes);
|
||||||
},
|
},
|
||||||
|
thread_local_func: move |world, resources| {
|
||||||
|
thread_local_commands.apply(world, resources);
|
||||||
|
},
|
||||||
init_func: move |resources| {
|
init_func: move |resources| {
|
||||||
<($($resource,)*)>::initialize(resources, Some(id));
|
<($($resource,)*)>::initialize(resources, Some(id));
|
||||||
},
|
},
|
||||||
archetype_access: ArchetypeAccess::default(),
|
archetype_access: ArchetypeAccess::default(),
|
||||||
set_archetype_access: |world, archetype_access| {
|
set_archetype_access: |world, archetype_access| {
|
||||||
for archetype in world.archetypes() {
|
for archetype in world.archetypes() {
|
||||||
archetype_access.set_bits_for_query::<($($component,)*)>(world);
|
archetype_access.set_access_for_query::<($($component,)*)>(world);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -162,12 +167,13 @@ macro_rules! impl_into_query_system {
|
|||||||
#[allow(unused_unsafe)]
|
#[allow(unused_unsafe)]
|
||||||
fn system(mut self) -> Box<dyn System> {
|
fn system(mut self) -> Box<dyn System> {
|
||||||
let id = SystemId::new();
|
let id = SystemId::new();
|
||||||
|
let commands = Commands::default();
|
||||||
|
let thread_local_commands = commands.clone();
|
||||||
Box::new(SystemFn {
|
Box::new(SystemFn {
|
||||||
commands: Commands::default(),
|
|
||||||
thread_local_execution: ThreadLocalExecution::NextFlush,
|
thread_local_execution: ThreadLocalExecution::NextFlush,
|
||||||
id,
|
id,
|
||||||
name: core::any::type_name::<Self>().into(),
|
name: core::any::type_name::<Self>().into(),
|
||||||
func: move |commands, world, resources| {
|
func: move |world, resources| {
|
||||||
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::borrow(&resources.resource_archetypes);
|
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::borrow(&resources.resource_archetypes);
|
||||||
{
|
{
|
||||||
let ($($resource,)*) = resources.query_system::<($($resource,)*)>(id);
|
let ($($resource,)*) = resources.query_system::<($($resource,)*)>(id);
|
||||||
@ -180,13 +186,16 @@ macro_rules! impl_into_query_system {
|
|||||||
}
|
}
|
||||||
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::release(&resources.resource_archetypes);
|
<<($($resource,)*) as ResourceQuery>::Fetch as FetchResource>::release(&resources.resource_archetypes);
|
||||||
},
|
},
|
||||||
|
thread_local_func: move |world, resources| {
|
||||||
|
thread_local_commands.apply(world, resources);
|
||||||
|
},
|
||||||
init_func: move |resources| {
|
init_func: move |resources| {
|
||||||
<($($resource,)*)>::initialize(resources, Some(id));
|
<($($resource,)*)>::initialize(resources, Some(id));
|
||||||
},
|
},
|
||||||
archetype_access: ArchetypeAccess::default(),
|
archetype_access: ArchetypeAccess::default(),
|
||||||
set_archetype_access: |world, archetype_access| {
|
set_archetype_access: |world, archetype_access| {
|
||||||
for archetype in world.archetypes() {
|
for archetype in world.archetypes() {
|
||||||
$(archetype_access.set_bits_for_query::<$query>(world);)*
|
$(archetype_access.set_access_for_query::<$query>(world);)*
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -281,27 +290,27 @@ impl_into_systems!(Ra,Rb,Rc,Rd,Re,Rf,Rg,Rh,Ri);
|
|||||||
#[rustfmt::skip]
|
#[rustfmt::skip]
|
||||||
impl_into_systems!(Ra,Rb,Rc,Rd,Re,Rf,Rg,Rh,Ri,Rj);
|
impl_into_systems!(Ra,Rb,Rc,Rd,Re,Rf,Rg,Rh,Ri,Rj);
|
||||||
|
|
||||||
pub struct ThreadLocalSystem<F>
|
pub trait IntoThreadLocalSystem {
|
||||||
where
|
fn thread_local_system(self) -> Box<dyn System>;
|
||||||
F: ThreadLocalSystemFn,
|
|
||||||
{
|
|
||||||
func: F,
|
|
||||||
name: Cow<'static, str>,
|
|
||||||
id: SystemId,
|
|
||||||
// TODO: add dependency info here
|
|
||||||
}
|
}
|
||||||
impl<F> ThreadLocalSystem<F>
|
|
||||||
where
|
impl<F> IntoThreadLocalSystem for F where F: ThreadLocalSystemFn {
|
||||||
F: ThreadLocalSystemFn,
|
fn thread_local_system(mut self) -> Box<dyn System> {
|
||||||
{
|
Box::new(SystemFn {
|
||||||
pub fn new(func: F) -> Box<dyn System> {
|
thread_local_func: move |world, resources| {
|
||||||
Box::new(Self {
|
self.run(world, resources);
|
||||||
func,
|
},
|
||||||
|
func: |_, _| {},
|
||||||
|
init_func: |_| {},
|
||||||
|
set_archetype_access: |_, _| {},
|
||||||
|
thread_local_execution: ThreadLocalExecution::Immediate,
|
||||||
name: core::any::type_name::<F>().into(),
|
name: core::any::type_name::<F>().into(),
|
||||||
id: SystemId::new(),
|
id: SystemId::new(),
|
||||||
|
archetype_access: ArchetypeAccess::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ThreadLocalSystemFn: Send + Sync + 'static {
|
pub trait ThreadLocalSystemFn: Send + Sync + 'static {
|
||||||
fn run(&mut self, world: &mut World, resource: &mut Resources);
|
fn run(&mut self, world: &mut World, resource: &mut Resources);
|
||||||
}
|
}
|
||||||
@ -313,31 +322,4 @@ where
|
|||||||
fn run(&mut self, world: &mut World, resources: &mut Resources) {
|
fn run(&mut self, world: &mut World, resources: &mut Resources) {
|
||||||
self(world, resources);
|
self(world, resources);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> System for ThreadLocalSystem<F>
|
|
||||||
where
|
|
||||||
F: ThreadLocalSystemFn + Send + Sync,
|
|
||||||
{
|
|
||||||
fn name(&self) -> Cow<'static, str> {
|
|
||||||
self.name.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn thread_local_execution(&self) -> ThreadLocalExecution {
|
|
||||||
ThreadLocalExecution::Immediate
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run(&mut self, _world: &World, _resources: &Resources) {}
|
|
||||||
|
|
||||||
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources) {
|
|
||||||
self.func.run(world, resources);
|
|
||||||
}
|
|
||||||
fn id(&self) -> SystemId {
|
|
||||||
self.id
|
|
||||||
}
|
|
||||||
fn update_archetype_access(&mut self, _world: &World) {
|
|
||||||
}
|
|
||||||
fn get_archetype_access(&self) -> Option<&ArchetypeAccess> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
@ -12,7 +12,7 @@ mod system;
|
|||||||
mod world_builder;
|
mod world_builder;
|
||||||
|
|
||||||
pub use commands::{Commands, CommandsInternal};
|
pub use commands::{Commands, CommandsInternal};
|
||||||
pub use into_system::{IntoForEachSystem, IntoQuerySystem, Query, ThreadLocalSystem};
|
pub use into_system::{IntoForEachSystem, IntoQuerySystem, IntoThreadLocalSystem, Query};
|
||||||
pub use resource_query::{FetchResource, Local, Res, ResMut, ResourceQuery};
|
pub use resource_query::{FetchResource, Local, Res, ResMut, ResourceQuery};
|
||||||
pub use resources::{FromResources, Resource, Resources};
|
pub use resources::{FromResources, Resource, Resources};
|
||||||
pub use schedule::Schedule;
|
pub use schedule::Schedule;
|
||||||
|
@ -20,7 +20,7 @@ pub trait System: Send + Sync {
|
|||||||
fn name(&self) -> Cow<'static, str>;
|
fn name(&self) -> Cow<'static, str>;
|
||||||
fn id(&self) -> SystemId;
|
fn id(&self) -> SystemId;
|
||||||
fn update_archetype_access(&mut self, world: &World);
|
fn update_archetype_access(&mut self, world: &World);
|
||||||
fn get_archetype_access(&self) -> Option<&ArchetypeAccess>;
|
fn get_archetype_access(&self) -> &ArchetypeAccess;
|
||||||
fn thread_local_execution(&self) -> ThreadLocalExecution;
|
fn thread_local_execution(&self) -> ThreadLocalExecution;
|
||||||
fn run(&mut self, world: &World, resources: &Resources);
|
fn run(&mut self, world: &World, resources: &Resources);
|
||||||
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources);
|
fn run_thread_local(&mut self, world: &mut World, resources: &mut Resources);
|
||||||
|
@ -34,7 +34,7 @@ use self::{
|
|||||||
use base_render_graph::{BaseRenderGraphBuilder, BaseRenderGraphConfig};
|
use base_render_graph::{BaseRenderGraphBuilder, BaseRenderGraphConfig};
|
||||||
use bevy_app::{AppBuilder, AppPlugin};
|
use bevy_app::{AppBuilder, AppPlugin};
|
||||||
use bevy_asset::AddAsset;
|
use bevy_asset::AddAsset;
|
||||||
use bevy_ecs::{IntoQuerySystem, ThreadLocalSystem};
|
use bevy_ecs::{IntoQuerySystem, IntoThreadLocalSystem};
|
||||||
use bevy_type_registry::RegisterType;
|
use bevy_type_registry::RegisterType;
|
||||||
use draw::{clear_draw_system, Draw};
|
use draw::{clear_draw_system, Draw};
|
||||||
use mesh::mesh_resource_provider_system;
|
use mesh::mesh_resource_provider_system;
|
||||||
@ -124,7 +124,7 @@ impl AppPlugin for RenderPlugin {
|
|||||||
)
|
)
|
||||||
.add_system_to_stage(
|
.add_system_to_stage(
|
||||||
stage::RENDER_GRAPH_SYSTEMS,
|
stage::RENDER_GRAPH_SYSTEMS,
|
||||||
ThreadLocalSystem::new(render_graph_schedule_executor_system),
|
render_graph_schedule_executor_system.thread_local_system(),
|
||||||
)
|
)
|
||||||
.add_system_to_stage(stage::DRAW, draw_render_pipelines_system.system())
|
.add_system_to_stage(stage::DRAW, draw_render_pipelines_system.system())
|
||||||
.add_system_to_stage(stage::POST_RENDER, clear_shader_defs_system.system());
|
.add_system_to_stage(stage::POST_RENDER, clear_shader_defs_system.system());
|
||||||
|
@ -9,7 +9,7 @@ pub use scene_spawner::*;
|
|||||||
|
|
||||||
use bevy_app::{stage, AppBuilder, AppPlugin};
|
use bevy_app::{stage, AppBuilder, AppPlugin};
|
||||||
use bevy_asset::AddAsset;
|
use bevy_asset::AddAsset;
|
||||||
use bevy_ecs::ThreadLocalSystem;
|
use bevy_ecs::IntoThreadLocalSystem;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ScenePlugin;
|
pub struct ScenePlugin;
|
||||||
@ -22,8 +22,6 @@ impl AppPlugin for ScenePlugin {
|
|||||||
.add_asset_loader::<Scene, SceneLoader>()
|
.add_asset_loader::<Scene, SceneLoader>()
|
||||||
.init_resource::<SceneSpawner>()
|
.init_resource::<SceneSpawner>()
|
||||||
.add_stage_after(stage::EVENT_UPDATE, SCENE_STAGE)
|
.add_stage_after(stage::EVENT_UPDATE, SCENE_STAGE)
|
||||||
.add_system_to_stage(SCENE_STAGE, ThreadLocalSystem::new(scene_spawner_system));
|
.add_system_to_stage(SCENE_STAGE, scene_spawner_system.thread_local_system());
|
||||||
// TODO: port scenes to bevy_ecs
|
|
||||||
// .add_system_to_stage_ecs(SCENE_STAGE, ThreadLocalSystem::new(scene_spawner_system));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ pub use wgpu_renderer::*;
|
|||||||
pub use wgpu_resources::*;
|
pub use wgpu_resources::*;
|
||||||
|
|
||||||
use bevy_app::{AppBuilder, AppPlugin};
|
use bevy_app::{AppBuilder, AppPlugin};
|
||||||
use bevy_ecs::{IntoQuerySystem, Resources, ThreadLocalSystem, World};
|
use bevy_ecs::{IntoQuerySystem, Resources, IntoThreadLocalSystem, World};
|
||||||
use bevy_render::{
|
use bevy_render::{
|
||||||
render_resource::{free_shared_buffers_system, SharedBuffers},
|
render_resource::{free_shared_buffers_system, SharedBuffers},
|
||||||
renderer::RenderResourceContext,
|
renderer::RenderResourceContext,
|
||||||
@ -25,7 +25,7 @@ impl AppPlugin for WgpuPlugin {
|
|||||||
let render_system = wgpu_render_system(app.resources_mut());
|
let render_system = wgpu_render_system(app.resources_mut());
|
||||||
app.add_system_to_stage(
|
app.add_system_to_stage(
|
||||||
bevy_render::stage::RENDER,
|
bevy_render::stage::RENDER,
|
||||||
ThreadLocalSystem::new(render_system),
|
render_system.thread_local_system(),
|
||||||
)
|
)
|
||||||
.add_system_to_stage(
|
.add_system_to_stage(
|
||||||
bevy_render::stage::POST_RENDER,
|
bevy_render::stage::POST_RENDER,
|
||||||
|
@ -256,7 +256,7 @@ fn main() {
|
|||||||
.init_resource::<GameState>()
|
.init_resource::<GameState>()
|
||||||
// Startup systems run exactly once BEFORE all other systems. These are generally used for
|
// Startup systems run exactly once BEFORE all other systems. These are generally used for
|
||||||
// app initialization code (ex: adding entities and resources)
|
// app initialization code (ex: adding entities and resources)
|
||||||
.add_startup_system(ThreadLocalSystem::new(startup_system))
|
.add_startup_system(startup_system.thread_local_system())
|
||||||
// my_system.system() calls converts normal rust functions into ECS systems:
|
// my_system.system() calls converts normal rust functions into ECS systems:
|
||||||
.add_system(print_message_system.system())
|
.add_system(print_message_system.system())
|
||||||
//
|
//
|
||||||
|
@ -10,7 +10,7 @@ fn main() {
|
|||||||
// The core Bevy plugins already register their components, so you only need this step for custom components.
|
// The core Bevy plugins already register their components, so you only need this step for custom components.
|
||||||
.register_component::<ComponentA>()
|
.register_component::<ComponentA>()
|
||||||
.register_component::<ComponentB>()
|
.register_component::<ComponentB>()
|
||||||
.add_startup_system(ThreadLocalSystem::new(save_scene_system))
|
.add_startup_system(save_scene_system.thread_local_system())
|
||||||
.add_startup_system(load_scene_system.system())
|
.add_startup_system(load_scene_system.system())
|
||||||
.add_system(print_system.system())
|
.add_system(print_system.system())
|
||||||
.run();
|
.run();
|
||||||
|
@ -11,7 +11,7 @@ pub use crate::{
|
|||||||
diagnostic::DiagnosticsPlugin,
|
diagnostic::DiagnosticsPlugin,
|
||||||
ecs::{
|
ecs::{
|
||||||
Bundle, Commands, Component, Entity, FromResources, IntoForEachSystem, IntoQuerySystem,
|
Bundle, Commands, Component, Entity, FromResources, IntoForEachSystem, IntoQuerySystem,
|
||||||
Local, Query, Ref, RefMut, Res, ResMut, Resource, Resources, System, ThreadLocalSystem,
|
IntoThreadLocalSystem, Local, Query, Ref, RefMut, Res, ResMut, Resource, Resources, System,
|
||||||
World, WorldBuilderSource,
|
World, WorldBuilderSource,
|
||||||
},
|
},
|
||||||
input::{keyboard::KeyCode, mouse::MouseButton, Input},
|
input::{keyboard::KeyCode, mouse::MouseButton, Input},
|
||||||
|
Loading…
Reference in New Issue
Block a user