Stageless: move MainThreadExecutor to schedule_v3 (#7444)

# Objective

- Trying to move some of the fixes from https://github.com/bevyengine/bevy/pull/7267 to make that one easier to review
- The MainThreadExecutor is how the render world runs nonsend systems on the main thread for pipelined rendering.
- The multithread executor for stageless wasn't using the MainThreadExecutor.
- MainThreadExecutor was declared in the old executor_parallel module that is getting deleted.
- The way the MainThreadExecutor was getting passed to the scope was actually unsound as the resource could be dropped from the World while the schedule was running

## Solution

- Move MainThreadExecutor to the new multithreaded_executor's file.
- Make the multithreaded executor use the MainThreadExecutor
- Clone the MainThreadExecutor onto the stack and pass that ref in

## Changelog

- Move MainThreadExecutor for stageless migration.
This commit is contained in:
Mike 2023-02-03 03:19:41 +00:00
parent 27e20df6de
commit 4f3ed196fa
4 changed files with 65 additions and 51 deletions

View File

@ -1,15 +1,12 @@
use std::sync::Arc;
use crate as bevy_ecs;
use crate::{ use crate::{
archetype::ArchetypeComponentId, archetype::ArchetypeComponentId,
query::Access, query::Access,
schedule::{ParallelSystemExecutor, SystemContainer}, schedule::{ParallelSystemExecutor, SystemContainer},
system::Resource, schedule_v3::MainThreadExecutor,
world::World, world::World,
}; };
use async_channel::{Receiver, Sender}; use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument; use bevy_utils::tracing::Instrument;
use event_listener::Event; use event_listener::Event;
@ -18,16 +15,6 @@ use fixedbitset::FixedBitSet;
#[cfg(test)] #[cfg(test)]
use scheduling_event::*; use scheduling_event::*;
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
}
}
struct SystemSchedulingMetadata { struct SystemSchedulingMetadata {
/// Used to signal the system's task to start the system. /// Used to signal the system's task to start the system.
start: Event, start: Event,
@ -138,7 +125,10 @@ impl ParallelSystemExecutor for ParallelExecutor {
} }
} }
let thread_executor = world.get_resource::<MainThreadExecutor>().map(|e| &*e.0); let thread_executor = world
.get_resource::<MainThreadExecutor>()
.map(|e| e.0.clone());
let thread_executor = thread_executor.as_deref();
ComputeTaskPool::init(TaskPool::default).scope_with_executor( ComputeTaskPool::init(TaskPool::default).scope_with_executor(
false, false,

View File

@ -2,7 +2,7 @@ mod multi_threaded;
mod simple; mod simple;
mod single_threaded; mod single_threaded;
pub use self::multi_threaded::MultiThreadedExecutor; pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor};
pub use self::simple::SimpleExecutor; pub use self::simple::SimpleExecutor;
pub use self::single_threaded::SingleThreadedExecutor; pub use self::single_threaded::SingleThreadedExecutor;

View File

@ -1,14 +1,16 @@
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default; use bevy_utils::default;
use bevy_utils::syncunsafecell::SyncUnsafeCell; use bevy_utils::syncunsafecell::SyncUnsafeCell;
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
use bevy_utils::tracing::{info_span, Instrument}; use bevy_utils::tracing::{info_span, Instrument};
use std::sync::Arc;
use async_channel::{Receiver, Sender}; use async_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet; use fixedbitset::FixedBitSet;
use crate::{ use crate::{
archetype::ArchetypeComponentId, archetype::ArchetypeComponentId,
prelude::Resource,
query::Access, query::Access,
schedule_v3::{ schedule_v3::{
is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule, is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule,
@ -17,6 +19,8 @@ use crate::{
world::World, world::World,
}; };
use crate as bevy_ecs;
/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. /// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`].
struct SyncUnsafeSchedule<'a> { struct SyncUnsafeSchedule<'a> {
systems: &'a [SyncUnsafeCell<BoxedSystem>], systems: &'a [SyncUnsafeCell<BoxedSystem>],
@ -145,47 +149,56 @@ impl SystemExecutor for MultiThreadedExecutor {
} }
} }
let thread_executor = world
.get_resource::<MainThreadExecutor>()
.map(|e| e.0.clone());
let thread_executor = thread_executor.as_deref();
let world = SyncUnsafeCell::from_mut(world); let world = SyncUnsafeCell::from_mut(world);
let SyncUnsafeSchedule { let SyncUnsafeSchedule {
systems, systems,
mut conditions, mut conditions,
} = SyncUnsafeSchedule::new(schedule); } = SyncUnsafeSchedule::new(schedule);
ComputeTaskPool::init(TaskPool::default).scope(|scope| { ComputeTaskPool::init(TaskPool::default).scope_with_executor(
// the executor itself is a `Send` future so that it can run false,
// alongside systems that claim the local thread thread_executor,
let executor = async { |scope| {
while self.num_completed_systems < num_systems { // the executor itself is a `Send` future so that it can run
// SAFETY: self.ready_systems does not contain running systems // alongside systems that claim the local thread
unsafe { let executor = async {
self.spawn_system_tasks(scope, systems, &mut conditions, world); while self.num_completed_systems < num_systems {
} // SAFETY: self.ready_systems does not contain running systems
unsafe {
if self.num_running_systems > 0 { self.spawn_system_tasks(scope, systems, &mut conditions, world);
// wait for systems to complete
let index = self
.receiver
.recv()
.await
.unwrap_or_else(|error| unreachable!("{}", error));
self.finish_system_and_signal_dependents(index);
while let Ok(index) = self.receiver.try_recv() {
self.finish_system_and_signal_dependents(index);
} }
self.rebuild_active_access(); if self.num_running_systems > 0 {
} // wait for systems to complete
} let index = self
}; .receiver
.recv()
.await
.unwrap_or_else(|error| unreachable!("{}", error));
#[cfg(feature = "trace")] self.finish_system_and_signal_dependents(index);
let executor_span = info_span!("schedule_task");
#[cfg(feature = "trace")] while let Ok(index) = self.receiver.try_recv() {
let executor = executor.instrument(executor_span); self.finish_system_and_signal_dependents(index);
scope.spawn(executor); }
});
self.rebuild_active_access();
}
}
};
#[cfg(feature = "trace")]
let executor_span = info_span!("schedule_task");
#[cfg(feature = "trace")]
let executor = executor.instrument(executor_span);
scope.spawn(executor);
},
);
// Do one final apply buffers after all systems have completed // Do one final apply buffers after all systems have completed
// SAFETY: all systems have completed, and so no outstanding accesses remain // SAFETY: all systems have completed, and so no outstanding accesses remain
@ -574,3 +587,13 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}) })
.fold(true, |acc, res| acc && res) .fold(true, |acc, res| acc && res)
} }
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
}
}

View File

@ -2,7 +2,8 @@ use async_channel::{Receiver, Sender};
use bevy_app::{App, AppLabel, Plugin, SubApp}; use bevy_app::{App, AppLabel, Plugin, SubApp};
use bevy_ecs::{ use bevy_ecs::{
schedule::{MainThreadExecutor, StageLabel, SystemStage}, schedule::{StageLabel, SystemStage},
schedule_v3::MainThreadExecutor,
system::Resource, system::Resource,
world::{Mut, World}, world::{Mut, World},
}; };