Fix for a bug (#449) in scheduler that could result in systems running concurrently when they shouldn't.
This commit is contained in:
parent
8b3553002d
commit
9eba19c8f0
@ -144,6 +144,11 @@ impl ExecutorStage {
|
|||||||
for system_index in prepare_system_index_range.clone() {
|
for system_index in prepare_system_index_range.clone() {
|
||||||
let mut system = systems[system_index].lock();
|
let mut system = systems[system_index].lock();
|
||||||
system.update_archetype_access(world);
|
system.update_archetype_access(world);
|
||||||
|
|
||||||
|
// Clear this so that the next block of code that populates it doesn't insert
|
||||||
|
// duplicates
|
||||||
|
self.system_dependents[system_index].clear();
|
||||||
|
self.system_dependencies[system_index].clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// calculate dependencies between systems and build execution order
|
// calculate dependencies between systems and build execution order
|
||||||
@ -204,6 +209,27 @@ impl ExecutorStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that dependents are not duplicated
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
for system_index in prepare_system_index_range.clone() {
|
||||||
|
let mut system_dependents_set = std::collections::HashSet::new();
|
||||||
|
for dependent_system in &self.system_dependents[system_index] {
|
||||||
|
let inserted = system_dependents_set.insert(*dependent_system);
|
||||||
|
|
||||||
|
// This means duplicate values are in the system_dependents list
|
||||||
|
// This is reproducing when archetypes change. When we fix this, we can remove
|
||||||
|
// the hack below and make this a debug-only assert or remove it
|
||||||
|
debug_assert!(inserted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the ready events lists associated with each system so we can rebuild them
|
||||||
|
for ready_events_of_dependents in
|
||||||
|
&mut self.ready_events_of_dependents[prepare_system_index_range.clone()]
|
||||||
|
{
|
||||||
|
ready_events_of_dependents.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// Now that system_dependents and system_dependencies is populated, update
|
// Now that system_dependents and system_dependencies is populated, update
|
||||||
// system_dependency_count and ready_events
|
// system_dependency_count and ready_events
|
||||||
for system_index in prepare_system_index_range.clone() {
|
for system_index in prepare_system_index_range.clone() {
|
||||||
@ -285,7 +311,11 @@ impl ExecutorStage {
|
|||||||
);
|
);
|
||||||
|
|
||||||
for dependency in self.system_dependencies[system_index].ones() {
|
for dependency in self.system_dependencies[system_index].ones() {
|
||||||
log::trace!(" * Depends on {}", systems[dependency].lock().name());
|
log::trace!(
|
||||||
|
" * system ({}) depends on {}",
|
||||||
|
system_index,
|
||||||
|
systems[dependency].lock().name()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This event will be awaited, preventing the task from starting until all
|
// This event will be awaited, preventing the task from starting until all
|
||||||
@ -298,7 +328,6 @@ impl ExecutorStage {
|
|||||||
if start_system_index != 0 {
|
if start_system_index != 0 {
|
||||||
if let Some(ready_event) = ready_event.as_ref() {
|
if let Some(ready_event) = ready_event.as_ref() {
|
||||||
for dependency in self.system_dependencies[system_index].ones() {
|
for dependency in self.system_dependencies[system_index].ones() {
|
||||||
log::trace!(" * Depends on {}", dependency);
|
|
||||||
if dependency < start_system_index {
|
if dependency < start_system_index {
|
||||||
ready_event.decrement();
|
ready_event.decrement();
|
||||||
}
|
}
|
||||||
@ -309,8 +338,32 @@ impl ExecutorStage {
|
|||||||
let world_ref = &*world;
|
let world_ref = &*world;
|
||||||
let resources_ref = &*resources;
|
let resources_ref = &*resources;
|
||||||
|
|
||||||
|
let dependent_systems = &self.system_dependents[system_index];
|
||||||
let trigger_events = &self.ready_events_of_dependents[system_index];
|
let trigger_events = &self.ready_events_of_dependents[system_index];
|
||||||
|
|
||||||
|
// Verify that any dependent task has a > 0 count. If a dependent task has > 0
|
||||||
|
// count, then the current system we are starting now isn't blocking it from running
|
||||||
|
// as it should be. Failure here implies the sync primitives are not matching the
|
||||||
|
// intended schedule. This likely compiles out if trace/asserts are disabled but
|
||||||
|
// make it explicitly debug-only anyways
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
{
|
||||||
|
debug_assert_eq!(trigger_events.len(), dependent_systems.len());
|
||||||
|
for (trigger_event, dependent_system_index) in
|
||||||
|
trigger_events.iter().zip(dependent_systems)
|
||||||
|
{
|
||||||
|
log::trace!(
|
||||||
|
" * system ({}) triggers events: ({}): {}",
|
||||||
|
system_index,
|
||||||
|
dependent_system_index,
|
||||||
|
trigger_event.get()
|
||||||
|
);
|
||||||
|
debug_assert!(
|
||||||
|
*dependent_system_index < start_system_index || trigger_event.get() > 0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn the task
|
// Spawn the task
|
||||||
scope.spawn(async move {
|
scope.spawn(async move {
|
||||||
// Wait until our dependencies are done
|
// Wait until our dependencies are done
|
||||||
|
@ -34,6 +34,11 @@ impl CountdownEvent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the number of times decrement must be called to trigger notifying all listeners
|
||||||
|
pub fn get(&self) -> isize {
|
||||||
|
self.inner.counter.load(Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
/// Decrement the counter by one. If this is the Nth call, trigger all listeners
|
/// Decrement the counter by one. If this is the Nth call, trigger all listeners
|
||||||
pub fn decrement(&self) {
|
pub fn decrement(&self) {
|
||||||
// If we are the last decrementer, notify listeners
|
// If we are the last decrementer, notify listeners
|
||||||
|
Loading…
Reference in New Issue
Block a user