Schedule build pass (#11094)

# Objective

This is a follow up to #9822, which automatically adds sync points
during the Schedule build process.

However, the implementation in #9822 feels very "special case" to me. As
the number of things we want to do with the `Schedule` grows, we need a
modularized way to manage those behaviors. For example, in one of my
current experiments I want to automatically add systems to apply GPU
pipeline barriers between systems accessing GPU resources.

For dynamic modifications of the schedule, we mostly need these
capabilities:
- Storing custom data on schedule edges
- Storing custom data on schedule nodes
- Modify the schedule graph whenever it builds

These should be enough to allows us to add "hooks" to the schedule build
process for various reasons.

cc @hymm 

## Solution
This PR abstracts the process of schedule modification and created a new
trait, `ScheduleBuildPass`. Most of the logics in #9822 were moved to an
implementation of `ScheduleBuildPass`, `AutoInsertApplyDeferredPass`.

Whether a dependency edge should "ignore deferred" is now indicated by
the presence of a marker struct, `IgnoreDeferred`.

This PR has no externally visible effects. However, in a future PR I
propose to change the `before_ignore_deferred` and
`after_ignore_deferred` API into a more general form,
`before_with_options` and `after_with_options`.

```rs
schedule.add_systems(
    system.before_with_options(another_system, IgnoreDeferred)
);

schedule.add_systems(
    system.before_with_options(another_system, (
        IgnoreDeferred,
        AnyOtherOption {
            key: value
        }
    ))
);

schedule.add_systems(
    system.before_with_options(another_system, ())
);
```
This commit is contained in:
Zhixing Zhang 2025-02-05 15:14:05 -08:00 committed by GitHub
parent 9ea9c5df00
commit f2a65c2dd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 383 additions and 167 deletions

View File

@ -0,0 +1,161 @@
use alloc::{boxed::Box, collections::BTreeSet, vec::Vec};
use bevy_platform_support::collections::HashMap;
use crate::system::IntoSystem;
use crate::world::World;
use super::{
is_apply_deferred, ApplyDeferred, DiGraph, Direction, NodeId, ReportCycles, ScheduleBuildError,
ScheduleBuildPass, ScheduleGraph, SystemNode,
};
/// A [`ScheduleBuildPass`] that inserts [`ApplyDeferred`] systems into the schedule graph
/// when there are [`Deferred`](crate::prelude::Deferred)
/// in one system and there are ordering dependencies on that system. [`Commands`](crate::system::Commands) is one
/// such deferred buffer.
///
/// This pass is typically automatically added to the schedule. You can disable this by setting
/// [`ScheduleBuildSettings::auto_insert_apply_deferred`](crate::schedule::ScheduleBuildSettings::auto_insert_apply_deferred)
/// to `false`. You may want to disable this if you only want to sync deferred params at the end of the schedule,
/// or want to manually insert all your sync points.
#[derive(Debug, Default)]
pub struct AutoInsertApplyDeferredPass {
/// Dependency edges that will **not** automatically insert an instance of `ApplyDeferred` on the edge.
no_sync_edges: BTreeSet<(NodeId, NodeId)>,
auto_sync_node_ids: HashMap<u32, NodeId>,
}
/// If added to a dependency edge, the edge will not be considered for auto sync point insertions.
pub struct IgnoreDeferred;
impl AutoInsertApplyDeferredPass {
/// Returns the `NodeId` of the cached auto sync point. Will create
/// a new one if needed.
fn get_sync_point(&mut self, graph: &mut ScheduleGraph, distance: u32) -> NodeId {
self.auto_sync_node_ids
.get(&distance)
.copied()
.or_else(|| {
let node_id = self.add_auto_sync(graph);
self.auto_sync_node_ids.insert(distance, node_id);
Some(node_id)
})
.unwrap()
}
/// add an [`ApplyDeferred`] system with no config
fn add_auto_sync(&mut self, graph: &mut ScheduleGraph) -> NodeId {
let id = NodeId::System(graph.systems.len());
graph
.systems
.push(SystemNode::new(Box::new(IntoSystem::into_system(
ApplyDeferred,
))));
graph.system_conditions.push(Vec::new());
// ignore ambiguities with auto sync points
// They aren't under user control, so no one should know or care.
graph.ambiguous_with_all.insert(id);
id
}
}
impl ScheduleBuildPass for AutoInsertApplyDeferredPass {
type EdgeOptions = IgnoreDeferred;
fn add_dependency(&mut self, from: NodeId, to: NodeId, options: Option<&Self::EdgeOptions>) {
if options.is_some() {
self.no_sync_edges.insert((from, to));
}
}
fn build(
&mut self,
_world: &mut World,
graph: &mut ScheduleGraph,
dependency_flattened: &mut DiGraph,
) -> Result<(), ScheduleBuildError> {
let mut sync_point_graph = dependency_flattened.clone();
let topo = graph.topsort_graph(dependency_flattened, ReportCycles::Dependency)?;
// calculate the number of sync points each sync point is from the beginning of the graph
// use the same sync point if the distance is the same
let mut distances: HashMap<usize, Option<u32>> =
HashMap::with_capacity_and_hasher(topo.len(), Default::default());
for node in &topo {
let add_sync_after = graph.systems[node.index()].get().unwrap().has_deferred();
for target in dependency_flattened.neighbors_directed(*node, Direction::Outgoing) {
let add_sync_on_edge = add_sync_after
&& !is_apply_deferred(graph.systems[target.index()].get().unwrap())
&& !self.no_sync_edges.contains(&(*node, target));
let weight = if add_sync_on_edge { 1 } else { 0 };
let distance = distances
.get(&target.index())
.unwrap_or(&None)
.or(Some(0))
.map(|distance| {
distance.max(
distances.get(&node.index()).unwrap_or(&None).unwrap_or(0) + weight,
)
});
distances.insert(target.index(), distance);
if add_sync_on_edge {
let sync_point =
self.get_sync_point(graph, distances[&target.index()].unwrap());
sync_point_graph.add_edge(*node, sync_point);
sync_point_graph.add_edge(sync_point, target);
// edge is now redundant
sync_point_graph.remove_edge(*node, target);
}
}
}
*dependency_flattened = sync_point_graph;
Ok(())
}
fn collapse_set(
&mut self,
set: NodeId,
systems: &[NodeId],
dependency_flattened: &DiGraph,
) -> impl Iterator<Item = (NodeId, NodeId)> {
if systems.is_empty() {
// collapse dependencies for empty sets
for a in dependency_flattened.neighbors_directed(set, Direction::Incoming) {
for b in dependency_flattened.neighbors_directed(set, Direction::Outgoing) {
if self.no_sync_edges.contains(&(a, set))
&& self.no_sync_edges.contains(&(set, b))
{
self.no_sync_edges.insert((a, b));
}
}
}
} else {
for a in dependency_flattened.neighbors_directed(set, Direction::Incoming) {
for &sys in systems {
if self.no_sync_edges.contains(&(a, set)) {
self.no_sync_edges.insert((a, sys));
}
}
}
for b in dependency_flattened.neighbors_directed(set, Direction::Outgoing) {
for &sys in systems {
if self.no_sync_edges.contains(&(set, b)) {
self.no_sync_edges.insert((sys, b));
}
}
}
}
core::iter::empty()
}
}

View File

@ -4,6 +4,7 @@ use variadics_please::all_tuples;
use crate::{ use crate::{
result::Result, result::Result,
schedule::{ schedule::{
auto_insert_apply_deferred::IgnoreDeferred,
condition::{BoxedCondition, Condition}, condition::{BoxedCondition, Condition},
graph::{Ambiguity, Dependency, DependencyKind, GraphInfo}, graph::{Ambiguity, Dependency, DependencyKind, GraphInfo},
set::{InternedSystemSet, IntoSystemSet, SystemSet}, set::{InternedSystemSet, IntoSystemSet, SystemSet},
@ -137,7 +138,7 @@ impl<T> NodeConfigs<T> {
config config
.graph_info .graph_info
.dependencies .dependencies
.push(Dependency::new(DependencyKind::BeforeNoSync, set)); .push(Dependency::new(DependencyKind::Before, set).add_config(IgnoreDeferred));
} }
Self::Configs { configs, .. } => { Self::Configs { configs, .. } => {
for config in configs { for config in configs {
@ -153,7 +154,7 @@ impl<T> NodeConfigs<T> {
config config
.graph_info .graph_info
.dependencies .dependencies
.push(Dependency::new(DependencyKind::AfterNoSync, set)); .push(Dependency::new(DependencyKind::After, set).add_config(IgnoreDeferred));
} }
Self::Configs { configs, .. } => { Self::Configs { configs, .. } => {
for config in configs { for config in configs {
@ -224,9 +225,9 @@ impl<T> NodeConfigs<T> {
match &mut self { match &mut self {
Self::NodeConfig(_) => { /* no op */ } Self::NodeConfig(_) => { /* no op */ }
Self::Configs { chained, .. } => { Self::Configs { chained, .. } => {
*chained = Chain::Yes; chained.set_chained();
}
} }
};
self self
} }
@ -234,7 +235,7 @@ impl<T> NodeConfigs<T> {
match &mut self { match &mut self {
Self::NodeConfig(_) => { /* no op */ } Self::NodeConfig(_) => { /* no op */ }
Self::Configs { chained, .. } => { Self::Configs { chained, .. } => {
*chained = Chain::YesIgnoreDeferred; chained.set_chained_with_config(IgnoreDeferred);
} }
} }
self self
@ -582,7 +583,7 @@ macro_rules! impl_system_collection {
SystemConfigs::Configs { SystemConfigs::Configs {
configs: vec![$($sys.into_configs(),)*], configs: vec![$($sys.into_configs(),)*],
collective_conditions: Vec::new(), collective_conditions: Vec::new(),
chained: Chain::No, chained: Default::default(),
} }
} }
} }
@ -820,7 +821,7 @@ macro_rules! impl_system_set_collection {
SystemSetConfigs::Configs { SystemSetConfigs::Configs {
configs: vec![$($set.into_configs(),)*], configs: vec![$($set.into_configs(),)*],
collective_conditions: Vec::new(), collective_conditions: Vec::new(),
chained: Chain::No, chained: Default::default(),
} }
} }
} }

View File

@ -65,7 +65,7 @@ where
S: BuildHasher, S: BuildHasher,
{ {
/// Create a new `Graph` with estimated capacity. /// Create a new `Graph` with estimated capacity.
pub(crate) fn with_capacity(nodes: usize, edges: usize) -> Self pub fn with_capacity(nodes: usize, edges: usize) -> Self
where where
S: Default, S: Default,
{ {
@ -89,14 +89,14 @@ where
} }
/// Add node `n` to the graph. /// Add node `n` to the graph.
pub(crate) fn add_node(&mut self, n: NodeId) { pub fn add_node(&mut self, n: NodeId) {
self.nodes.entry(n).or_default(); self.nodes.entry(n).or_default();
} }
/// Remove a node `n` from the graph. /// Remove a node `n` from the graph.
/// ///
/// Computes in **O(N)** time, due to the removal of edges with other nodes. /// Computes in **O(N)** time, due to the removal of edges with other nodes.
pub(crate) fn remove_node(&mut self, n: NodeId) { pub fn remove_node(&mut self, n: NodeId) {
let Some(links) = self.nodes.swap_remove(&n) else { let Some(links) = self.nodes.swap_remove(&n) else {
return; return;
}; };
@ -166,7 +166,7 @@ where
/// Remove edge from `a` to `b` from the graph. /// Remove edge from `a` to `b` from the graph.
/// ///
/// Return `false` if the edge didn't exist. /// Return `false` if the edge didn't exist.
pub(crate) fn remove_edge(&mut self, a: NodeId, b: NodeId) -> bool { pub fn remove_edge(&mut self, a: NodeId, b: NodeId) -> bool {
let exist1 = self.remove_single_edge(a, b, Outgoing); let exist1 = self.remove_single_edge(a, b, Outgoing);
let exist2 = if a != b { let exist2 = if a != b {
self.remove_single_edge(b, a, Incoming) self.remove_single_edge(b, a, Incoming)

View File

@ -1,8 +1,13 @@
use alloc::{vec, vec::Vec}; use alloc::{boxed::Box, vec, vec::Vec};
use core::fmt::Debug; use core::{
any::{Any, TypeId},
fmt::Debug,
};
use smallvec::SmallVec; use smallvec::SmallVec;
use bevy_platform_support::collections::{HashMap, HashSet}; use bevy_platform_support::collections::{HashMap, HashSet};
use bevy_utils::TypeIdMap;
use fixedbitset::FixedBitSet; use fixedbitset::FixedBitSet;
use crate::schedule::set::*; use crate::schedule::set::*;
@ -21,22 +26,26 @@ pub(crate) enum DependencyKind {
Before, Before,
/// A node that should be succeeded. /// A node that should be succeeded.
After, After,
/// A node that should be preceded and will **not** automatically insert an instance of `ApplyDeferred` on the edge.
BeforeNoSync,
/// A node that should be succeeded and will **not** automatically insert an instance of `ApplyDeferred` on the edge.
AfterNoSync,
} }
/// An edge to be added to the dependency graph. /// An edge to be added to the dependency graph.
#[derive(Clone)]
pub(crate) struct Dependency { pub(crate) struct Dependency {
pub(crate) kind: DependencyKind, pub(crate) kind: DependencyKind,
pub(crate) set: InternedSystemSet, pub(crate) set: InternedSystemSet,
pub(crate) options: TypeIdMap<Box<dyn Any>>,
} }
impl Dependency { impl Dependency {
pub fn new(kind: DependencyKind, set: InternedSystemSet) -> Self { pub fn new(kind: DependencyKind, set: InternedSystemSet) -> Self {
Self { kind, set } Self {
kind,
set,
options: Default::default(),
}
}
pub fn add_config<T: 'static>(mut self, option: T) -> Self {
self.options.insert(TypeId::of::<T>(), Box::new(option));
self
} }
} }
@ -52,7 +61,7 @@ pub(crate) enum Ambiguity {
} }
/// Metadata about how the node fits in the schedule graph /// Metadata about how the node fits in the schedule graph
#[derive(Clone, Default)] #[derive(Default)]
pub(crate) struct GraphInfo { pub(crate) struct GraphInfo {
/// the sets that the node belongs to (hierarchy) /// the sets that the node belongs to (hierarchy)
pub(crate) hierarchy: Vec<InternedSystemSet>, pub(crate) hierarchy: Vec<InternedSystemSet>,

View File

@ -13,7 +13,7 @@ pub enum NodeId {
impl NodeId { impl NodeId {
/// Returns the internal integer value. /// Returns the internal integer value.
pub(crate) const fn index(&self) -> usize { pub const fn index(&self) -> usize {
match self { match self {
NodeId::System(index) | NodeId::Set(index) => *index, NodeId::System(index) | NodeId::Set(index) => *index,
} }

View File

@ -1,18 +1,28 @@
//! Contains APIs for ordering systems and executing them on a [`World`](crate::world::World) //! Contains APIs for ordering systems and executing them on a [`World`](crate::world::World)
mod auto_insert_apply_deferred;
mod condition; mod condition;
mod config; mod config;
mod executor; mod executor;
mod graph; mod pass;
mod schedule; mod schedule;
mod set; mod set;
mod stepping; mod stepping;
use self::graph::*; use self::graph::*;
pub use self::{condition::*, config::*, executor::*, schedule::*, set::*}; pub use self::{condition::*, config::*, executor::*, schedule::*, set::*};
pub use pass::ScheduleBuildPass;
pub use self::graph::NodeId; pub use self::graph::NodeId;
/// An implementation of a graph data structure.
pub mod graph;
/// Included optional schedule build passes.
pub mod passes {
pub use crate::schedule::auto_insert_apply_deferred::*;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -1082,7 +1092,7 @@ mod tests {
schedule.graph_mut().initialize(&mut world); schedule.graph_mut().initialize(&mut world);
let _ = schedule.graph_mut().build_schedule( let _ = schedule.graph_mut().build_schedule(
world.components(), &mut world,
TestSchedule.intern(), TestSchedule.intern(),
&BTreeSet::new(), &BTreeSet::new(),
); );
@ -1131,7 +1141,7 @@ mod tests {
let mut world = World::new(); let mut world = World::new();
schedule.graph_mut().initialize(&mut world); schedule.graph_mut().initialize(&mut world);
let _ = schedule.graph_mut().build_schedule( let _ = schedule.graph_mut().build_schedule(
world.components(), &mut world,
TestSchedule.intern(), TestSchedule.intern(),
&BTreeSet::new(), &BTreeSet::new(),
); );

View File

@ -0,0 +1,79 @@
use alloc::{boxed::Box, vec::Vec};
use core::any::{Any, TypeId};
use super::{DiGraph, NodeId, ScheduleBuildError, ScheduleGraph};
use crate::world::World;
use bevy_utils::TypeIdMap;
use core::fmt::Debug;
/// A pass for modular modification of the dependency graph.
pub trait ScheduleBuildPass: Send + Sync + Debug + 'static {
/// Custom options for dependencies between sets or systems.
type EdgeOptions: 'static;
/// Called when a dependency between sets or systems was explicitly added to the graph.
fn add_dependency(&mut self, from: NodeId, to: NodeId, options: Option<&Self::EdgeOptions>);
/// Called while flattening the dependency graph. For each `set`, this method is called
/// with the `systems` associated with the set as well as an immutable reference to the current graph.
/// Instead of modifying the graph directly, this method should return an iterator of edges to add to the graph.
fn collapse_set(
&mut self,
set: NodeId,
systems: &[NodeId],
dependency_flattened: &DiGraph,
) -> impl Iterator<Item = (NodeId, NodeId)>;
/// The implementation will be able to modify the `ScheduleGraph` here.
fn build(
&mut self,
world: &mut World,
graph: &mut ScheduleGraph,
dependency_flattened: &mut DiGraph,
) -> Result<(), ScheduleBuildError>;
}
/// Object safe version of [`ScheduleBuildPass`].
pub(super) trait ScheduleBuildPassObj: Send + Sync + Debug {
fn build(
&mut self,
world: &mut World,
graph: &mut ScheduleGraph,
dependency_flattened: &mut DiGraph,
) -> Result<(), ScheduleBuildError>;
fn collapse_set(
&mut self,
set: NodeId,
systems: &[NodeId],
dependency_flattened: &DiGraph,
dependencies_to_add: &mut Vec<(NodeId, NodeId)>,
);
fn add_dependency(&mut self, from: NodeId, to: NodeId, all_options: &TypeIdMap<Box<dyn Any>>);
}
impl<T: ScheduleBuildPass> ScheduleBuildPassObj for T {
fn build(
&mut self,
world: &mut World,
graph: &mut ScheduleGraph,
dependency_flattened: &mut DiGraph,
) -> Result<(), ScheduleBuildError> {
self.build(world, graph, dependency_flattened)
}
fn collapse_set(
&mut self,
set: NodeId,
systems: &[NodeId],
dependency_flattened: &DiGraph,
dependencies_to_add: &mut Vec<(NodeId, NodeId)>,
) {
let iter = self.collapse_set(set, systems, dependency_flattened);
dependencies_to_add.extend(iter);
}
fn add_dependency(&mut self, from: NodeId, to: NodeId, all_options: &TypeIdMap<Box<dyn Any>>) {
let option = all_options
.get(&TypeId::of::<T::EdgeOptions>())
.and_then(|x| x.downcast_ref::<T::EdgeOptions>());
self.add_dependency(from, to, option);
}
}

View File

@ -4,18 +4,22 @@
)] )]
use alloc::{ use alloc::{
boxed::Box, boxed::Box,
collections::BTreeSet, collections::{BTreeMap, BTreeSet},
format, format,
string::{String, ToString}, string::{String, ToString},
vec, vec,
vec::Vec, vec::Vec,
}; };
use bevy_platform_support::collections::{HashMap, HashSet}; use bevy_platform_support::collections::{HashMap, HashSet};
use bevy_utils::default; use bevy_utils::{default, TypeIdMap};
use core::fmt::{Debug, Write}; use core::{
any::{Any, TypeId},
fmt::{Debug, Write},
};
use disqualified::ShortName; use disqualified::ShortName;
use fixedbitset::FixedBitSet; use fixedbitset::FixedBitSet;
use log::{error, info, warn}; use log::{error, info, warn};
use pass::ScheduleBuildPassObj;
use thiserror::Error; use thiserror::Error;
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
use tracing::info_span; use tracing::info_span;
@ -27,7 +31,7 @@ use crate::{
resource::Resource, resource::Resource,
result::Result, result::Result,
schedule::*, schedule::*,
system::{IntoSystem, ScheduleSystem}, system::ScheduleSystem,
world::World, world::World,
}; };
@ -225,15 +229,32 @@ fn make_executor(kind: ExecutorKind) -> Box<dyn SystemExecutor> {
} }
/// Chain systems into dependencies /// Chain systems into dependencies
#[derive(PartialEq)] #[derive(Default)]
pub enum Chain { pub enum Chain {
/// Run nodes in order. If there are deferred parameters in preceding systems a /// Systems are independent. Nodes are allowed to run in any order.
/// [`ApplyDeferred`] will be added on the edge. #[default]
Yes, Unchained,
/// Run nodes in order. This will not add [`ApplyDeferred`] between nodes. /// Systems are chained. `before -> after` ordering constraints
YesIgnoreDeferred, /// will be added between the successive elements.
/// Nodes are allowed to run in any order. Chained(TypeIdMap<Box<dyn Any>>),
No, }
impl Chain {
/// Specify that the systems must be chained.
pub fn set_chained(&mut self) {
if matches!(self, Chain::Unchained) {
*self = Self::Chained(Default::default());
};
}
/// Specify that the systems must be chained, and add the specified configuration for
/// all dependencies created between these systems.
pub fn set_chained_with_config<T: 'static>(&mut self, config: T) {
self.set_chained();
if let Chain::Chained(config_map) = self {
config_map.insert(TypeId::of::<T>(), Box::new(config));
} else {
unreachable!()
};
}
} }
/// A collection of systems, and the metadata and executor needed to run them /// A collection of systems, and the metadata and executor needed to run them
@ -297,13 +318,16 @@ impl Default for Schedule {
impl Schedule { impl Schedule {
/// Constructs an empty `Schedule`. /// Constructs an empty `Schedule`.
pub fn new(label: impl ScheduleLabel) -> Self { pub fn new(label: impl ScheduleLabel) -> Self {
Self { let mut this = Self {
label: label.intern(), label: label.intern(),
graph: ScheduleGraph::new(), graph: ScheduleGraph::new(),
executable: SystemSchedule::new(), executable: SystemSchedule::new(),
executor: make_executor(ExecutorKind::default()), executor: make_executor(ExecutorKind::default()),
executor_initialized: false, executor_initialized: false,
} };
// Call `set_build_settings` to add any default build passes
this.set_build_settings(Default::default());
this
} }
/// Get the `InternedScheduleLabel` for this `Schedule`. /// Get the `InternedScheduleLabel` for this `Schedule`.
@ -355,8 +379,24 @@ impl Schedule {
self self
} }
/// Add a custom build pass to the schedule.
pub fn add_build_pass<T: ScheduleBuildPass>(&mut self, pass: T) -> &mut Self {
self.graph.passes.insert(TypeId::of::<T>(), Box::new(pass));
self
}
/// Remove a custom build pass.
pub fn remove_build_pass<T: ScheduleBuildPass>(&mut self) {
self.graph.passes.remove(&TypeId::of::<T>());
}
/// Changes miscellaneous build settings. /// Changes miscellaneous build settings.
pub fn set_build_settings(&mut self, settings: ScheduleBuildSettings) -> &mut Self { pub fn set_build_settings(&mut self, settings: ScheduleBuildSettings) -> &mut Self {
if settings.auto_insert_apply_deferred {
self.add_build_pass(passes::AutoInsertApplyDeferredPass::default());
} else {
self.remove_build_pass::<passes::AutoInsertApplyDeferredPass>();
}
self.graph.settings = settings; self.graph.settings = settings;
self self
} }
@ -425,8 +465,8 @@ impl Schedule {
.ignored_scheduling_ambiguities .ignored_scheduling_ambiguities
.clone(); .clone();
self.graph.update_schedule( self.graph.update_schedule(
world,
&mut self.executable, &mut self.executable,
world.components(),
&ignored_ambiguities, &ignored_ambiguities,
self.label, self.label,
)?; )?;
@ -580,21 +620,24 @@ impl SystemSetNode {
} }
/// A [`ScheduleSystem`] stored in a [`ScheduleGraph`]. /// A [`ScheduleSystem`] stored in a [`ScheduleGraph`].
struct SystemNode { pub struct SystemNode {
inner: Option<ScheduleSystem>, inner: Option<ScheduleSystem>,
} }
impl SystemNode { impl SystemNode {
/// Create a new [`SystemNode`]
pub fn new(system: ScheduleSystem) -> Self { pub fn new(system: ScheduleSystem) -> Self {
Self { Self {
inner: Some(system), inner: Some(system),
} }
} }
/// Obtain a reference to the [`ScheduleSystem`] represented by this node.
pub fn get(&self) -> Option<&ScheduleSystem> { pub fn get(&self) -> Option<&ScheduleSystem> {
self.inner.as_ref() self.inner.as_ref()
} }
/// Obtain a mutable reference to the [`ScheduleSystem`] represented by this node.
pub fn get_mut(&mut self) -> Option<&mut ScheduleSystem> { pub fn get_mut(&mut self) -> Option<&mut ScheduleSystem> {
self.inner.as_mut() self.inner.as_mut()
} }
@ -607,9 +650,9 @@ impl SystemNode {
#[derive(Default)] #[derive(Default)]
pub struct ScheduleGraph { pub struct ScheduleGraph {
/// List of systems in the schedule /// List of systems in the schedule
systems: Vec<SystemNode>, pub systems: Vec<SystemNode>,
/// List of conditions for each system, in the same order as `systems` /// List of conditions for each system, in the same order as `systems`
system_conditions: Vec<Vec<BoxedCondition>>, pub system_conditions: Vec<Vec<BoxedCondition>>,
/// List of system sets in the schedule /// List of system sets in the schedule
system_sets: Vec<SystemSetNode>, system_sets: Vec<SystemSetNode>,
/// List of conditions for each system set, in the same order as `system_sets` /// List of conditions for each system set, in the same order as `system_sets`
@ -624,14 +667,14 @@ pub struct ScheduleGraph {
/// Directed acyclic graph of the dependency (which systems/sets have to run before which other systems/sets) /// Directed acyclic graph of the dependency (which systems/sets have to run before which other systems/sets)
dependency: Dag, dependency: Dag,
ambiguous_with: UnGraph, ambiguous_with: UnGraph,
ambiguous_with_all: HashSet<NodeId>, /// Nodes that are allowed to have ambiguous ordering relationship with any other systems.
pub ambiguous_with_all: HashSet<NodeId>,
conflicting_systems: Vec<(NodeId, NodeId, Vec<ComponentId>)>, conflicting_systems: Vec<(NodeId, NodeId, Vec<ComponentId>)>,
anonymous_sets: usize, anonymous_sets: usize,
changed: bool, changed: bool,
settings: ScheduleBuildSettings, settings: ScheduleBuildSettings,
/// Dependency edges that will **not** automatically insert an instance of `apply_deferred` on the edge.
no_sync_edges: BTreeSet<(NodeId, NodeId)>, passes: BTreeMap<TypeId, Box<dyn ScheduleBuildPassObj>>,
auto_sync_node_ids: HashMap<u32, NodeId>,
} }
impl ScheduleGraph { impl ScheduleGraph {
@ -652,8 +695,7 @@ impl ScheduleGraph {
anonymous_sets: 0, anonymous_sets: 0,
changed: false, changed: false,
settings: default(), settings: default(),
no_sync_edges: BTreeSet::new(), passes: default(),
auto_sync_node_ids: HashMap::default(),
} }
} }
@ -806,13 +848,12 @@ impl ScheduleGraph {
} => { } => {
self.apply_collective_conditions(&mut configs, collective_conditions); self.apply_collective_conditions(&mut configs, collective_conditions);
let ignore_deferred = matches!(chained, Chain::YesIgnoreDeferred); let is_chained = matches!(chained, Chain::Chained(_));
let chained = matches!(chained, Chain::Yes | Chain::YesIgnoreDeferred);
// Densely chained if // Densely chained if
// * chained and all configs in the chain are densely chained, or // * chained and all configs in the chain are densely chained, or
// * unchained with a single densely chained config // * unchained with a single densely chained config
let mut densely_chained = chained || configs.len() == 1; let mut densely_chained = is_chained || configs.len() == 1;
let mut configs = configs.into_iter(); let mut configs = configs.into_iter();
let mut nodes = Vec::new(); let mut nodes = Vec::new();
@ -822,14 +863,14 @@ impl ScheduleGraph {
densely_chained, densely_chained,
}; };
}; };
let mut previous_result = self.process_configs(first, collect_nodes || chained); let mut previous_result = self.process_configs(first, collect_nodes || is_chained);
densely_chained &= previous_result.densely_chained; densely_chained &= previous_result.densely_chained;
for current in configs { for current in configs {
let current_result = self.process_configs(current, collect_nodes || chained); let current_result = self.process_configs(current, collect_nodes || is_chained);
densely_chained &= current_result.densely_chained; densely_chained &= current_result.densely_chained;
if chained { if let Chain::Chained(chain_options) = &chained {
// if the current result is densely chained, we only need to chain the first node // if the current result is densely chained, we only need to chain the first node
let current_nodes = if current_result.densely_chained { let current_nodes = if current_result.densely_chained {
&current_result.nodes[..1] &current_result.nodes[..1]
@ -849,8 +890,12 @@ impl ScheduleGraph {
.graph .graph
.add_edge(*previous_node, *current_node); .add_edge(*previous_node, *current_node);
if ignore_deferred { for pass in self.passes.values_mut() {
self.no_sync_edges.insert((*previous_node, *current_node)); pass.add_dependency(
*previous_node,
*current_node,
chain_options,
);
} }
} }
} }
@ -973,7 +1018,7 @@ impl ScheduleGraph {
id: &NodeId, id: &NodeId,
graph_info: &GraphInfo, graph_info: &GraphInfo,
) -> Result<(), ScheduleBuildError> { ) -> Result<(), ScheduleBuildError> {
for Dependency { kind: _, set } in &graph_info.dependencies { for Dependency { set, .. } in &graph_info.dependencies {
match self.system_set_ids.get(set) { match self.system_set_ids.get(set) {
Some(set_id) => { Some(set_id) => {
if id == set_id { if id == set_id {
@ -1024,23 +1069,18 @@ impl ScheduleGraph {
self.dependency.graph.add_node(set); self.dependency.graph.add_node(set);
} }
for (kind, set) in dependencies for (kind, set, options) in dependencies
.into_iter() .into_iter()
.map(|Dependency { kind, set }| (kind, self.system_set_ids[&set])) .map(|Dependency { kind, set, options }| (kind, self.system_set_ids[&set], options))
{ {
let (lhs, rhs) = match kind { let (lhs, rhs) = match kind {
DependencyKind::Before => (id, set), DependencyKind::Before => (id, set),
DependencyKind::BeforeNoSync => {
self.no_sync_edges.insert((id, set));
(id, set)
}
DependencyKind::After => (set, id), DependencyKind::After => (set, id),
DependencyKind::AfterNoSync => {
self.no_sync_edges.insert((set, id));
(set, id)
}
}; };
self.dependency.graph.add_edge(lhs, rhs); self.dependency.graph.add_edge(lhs, rhs);
for pass in self.passes.values_mut() {
pass.add_dependency(lhs, rhs, &options);
}
// ensure set also appears in hierarchy graph // ensure set also appears in hierarchy graph
self.hierarchy.graph.add_node(set); self.hierarchy.graph.add_node(set);
@ -1090,7 +1130,7 @@ impl ScheduleGraph {
/// - checks for system access conflicts and reports ambiguities /// - checks for system access conflicts and reports ambiguities
pub fn build_schedule( pub fn build_schedule(
&mut self, &mut self,
components: &Components, world: &mut World,
schedule_label: InternedScheduleLabel, schedule_label: InternedScheduleLabel,
ignored_ambiguities: &BTreeSet<ComponentId>, ignored_ambiguities: &BTreeSet<ComponentId>,
) -> Result<SystemSchedule, ScheduleBuildError> { ) -> Result<SystemSchedule, ScheduleBuildError> {
@ -1123,10 +1163,12 @@ impl ScheduleGraph {
let mut dependency_flattened = self.get_dependency_flattened(&set_systems); let mut dependency_flattened = self.get_dependency_flattened(&set_systems);
// modify graph with auto sync points // modify graph with build passes
if self.settings.auto_insert_apply_deferred { let mut passes = core::mem::take(&mut self.passes);
dependency_flattened = self.auto_insert_apply_deferred(&mut dependency_flattened)?; for pass in passes.values_mut() {
pass.build(world, self, &mut dependency_flattened)?;
} }
self.passes = passes;
// topsort // topsort
let mut dependency_flattened_dag = Dag { let mut dependency_flattened_dag = Dag {
@ -1151,92 +1193,13 @@ impl ScheduleGraph {
&ambiguous_with_flattened, &ambiguous_with_flattened,
ignored_ambiguities, ignored_ambiguities,
); );
self.optionally_check_conflicts(&conflicting_systems, components, schedule_label)?; self.optionally_check_conflicts(&conflicting_systems, world.components(), schedule_label)?;
self.conflicting_systems = conflicting_systems; self.conflicting_systems = conflicting_systems;
// build the schedule // build the schedule
Ok(self.build_schedule_inner(dependency_flattened_dag, hier_results.reachable)) Ok(self.build_schedule_inner(dependency_flattened_dag, hier_results.reachable))
} }
// modify the graph to have sync nodes for any dependents after a system with deferred system params
fn auto_insert_apply_deferred(
&mut self,
dependency_flattened: &mut DiGraph,
) -> Result<DiGraph, ScheduleBuildError> {
let mut sync_point_graph = dependency_flattened.clone();
let topo = self.topsort_graph(dependency_flattened, ReportCycles::Dependency)?;
// calculate the number of sync points each sync point is from the beginning of the graph
// use the same sync point if the distance is the same
let mut distances: HashMap<usize, Option<u32>> =
HashMap::with_capacity_and_hasher(topo.len(), Default::default());
for node in &topo {
let add_sync_after = self.systems[node.index()].get().unwrap().has_deferred();
for target in dependency_flattened.neighbors_directed(*node, Outgoing) {
let add_sync_on_edge = add_sync_after
&& !is_apply_deferred(self.systems[target.index()].get().unwrap())
&& !self.no_sync_edges.contains(&(*node, target));
let weight = if add_sync_on_edge { 1 } else { 0 };
let distance = distances
.get(&target.index())
.unwrap_or(&None)
.or(Some(0))
.map(|distance| {
distance.max(
distances.get(&node.index()).unwrap_or(&None).unwrap_or(0) + weight,
)
});
distances.insert(target.index(), distance);
if add_sync_on_edge {
let sync_point = self.get_sync_point(distances[&target.index()].unwrap());
sync_point_graph.add_edge(*node, sync_point);
sync_point_graph.add_edge(sync_point, target);
// edge is now redundant
sync_point_graph.remove_edge(*node, target);
}
}
}
Ok(sync_point_graph)
}
/// add an [`ApplyDeferred`] system with no config
fn add_auto_sync(&mut self) -> NodeId {
let id = NodeId::System(self.systems.len());
self.systems
.push(SystemNode::new(Box::new(IntoSystem::into_system(
ApplyDeferred,
))));
self.system_conditions.push(Vec::new());
// ignore ambiguities with auto sync points
// They aren't under user control, so no one should know or care.
self.ambiguous_with_all.insert(id);
id
}
/// Returns the `NodeId` of the cached auto sync point. Will create
/// a new one if needed.
fn get_sync_point(&mut self, distance: u32) -> NodeId {
self.auto_sync_node_ids
.get(&distance)
.copied()
.or_else(|| {
let node_id = self.add_auto_sync();
self.auto_sync_node_ids.insert(distance, node_id);
Some(node_id)
})
.unwrap()
}
/// Return a map from system set `NodeId` to a list of system `NodeId`s that are included in the set. /// Return a map from system set `NodeId` to a list of system `NodeId`s that are included in the set.
/// Also return a map from system set `NodeId` to a `FixedBitSet` of system `NodeId`s that are included in the set, /// Also return a map from system set `NodeId` to a `FixedBitSet` of system `NodeId`s that are included in the set,
/// where the bitset order is the same as `self.systems` /// where the bitset order is the same as `self.systems`
@ -1284,34 +1247,25 @@ impl ScheduleGraph {
let mut dependency_flattened = self.dependency.graph.clone(); let mut dependency_flattened = self.dependency.graph.clone();
let mut temp = Vec::new(); let mut temp = Vec::new();
for (&set, systems) in set_systems { for (&set, systems) in set_systems {
for pass in self.passes.values_mut() {
pass.collapse_set(set, systems, &dependency_flattened, &mut temp);
}
if systems.is_empty() { if systems.is_empty() {
// collapse dependencies for empty sets // collapse dependencies for empty sets
for a in dependency_flattened.neighbors_directed(set, Incoming) { for a in dependency_flattened.neighbors_directed(set, Incoming) {
for b in dependency_flattened.neighbors_directed(set, Outgoing) { for b in dependency_flattened.neighbors_directed(set, Outgoing) {
if self.no_sync_edges.contains(&(a, set))
&& self.no_sync_edges.contains(&(set, b))
{
self.no_sync_edges.insert((a, b));
}
temp.push((a, b)); temp.push((a, b));
} }
} }
} else { } else {
for a in dependency_flattened.neighbors_directed(set, Incoming) { for a in dependency_flattened.neighbors_directed(set, Incoming) {
for &sys in systems { for &sys in systems {
if self.no_sync_edges.contains(&(a, set)) {
self.no_sync_edges.insert((a, sys));
}
temp.push((a, sys)); temp.push((a, sys));
} }
} }
for b in dependency_flattened.neighbors_directed(set, Outgoing) { for b in dependency_flattened.neighbors_directed(set, Outgoing) {
for &sys in systems { for &sys in systems {
if self.no_sync_edges.contains(&(set, b)) {
self.no_sync_edges.insert((sys, b));
}
temp.push((sys, b)); temp.push((sys, b));
} }
} }
@ -1506,8 +1460,8 @@ impl ScheduleGraph {
/// Updates the `SystemSchedule` from the `ScheduleGraph`. /// Updates the `SystemSchedule` from the `ScheduleGraph`.
fn update_schedule( fn update_schedule(
&mut self, &mut self,
world: &mut World,
schedule: &mut SystemSchedule, schedule: &mut SystemSchedule,
components: &Components,
ignored_ambiguities: &BTreeSet<ComponentId>, ignored_ambiguities: &BTreeSet<ComponentId>,
schedule_label: InternedScheduleLabel, schedule_label: InternedScheduleLabel,
) -> Result<(), ScheduleBuildError> { ) -> Result<(), ScheduleBuildError> {
@ -1534,7 +1488,7 @@ impl ScheduleGraph {
self.system_set_conditions[id.index()] = conditions; self.system_set_conditions[id.index()] = conditions;
} }
*schedule = self.build_schedule(components, schedule_label, ignored_ambiguities)?; *schedule = self.build_schedule(world, schedule_label, ignored_ambiguities)?;
// move systems into new schedule // move systems into new schedule
for &id in &schedule.system_ids { for &id in &schedule.system_ids {
@ -1583,8 +1537,10 @@ impl ProcessNodeConfig for InternedSystemSet {
} }
/// Used to select the appropriate reporting function. /// Used to select the appropriate reporting function.
enum ReportCycles { pub enum ReportCycles {
/// When sets contain themselves
Hierarchy, Hierarchy,
/// When the graph is no longer a DAG
Dependency, Dependency,
} }
@ -1701,7 +1657,7 @@ impl ScheduleGraph {
/// # Errors /// # Errors
/// ///
/// If the graph contain cycles, then an error is returned. /// If the graph contain cycles, then an error is returned.
fn topsort_graph( pub fn topsort_graph(
&self, &self,
graph: &DiGraph, graph: &DiGraph,
report: ReportCycles, report: ReportCycles,