Stageless: close the finish channel so executor doesn't deadlock (#7448)
# Objective - Fix panic_when_hierachy_cycle test hanging - The problem is that the scope only awaits one task at a time in get_results. In stageless this task is the multithreaded executor. That tasks hangs when a system panics and cannot make anymore progress. This wasn't a problem before because the executor was spawned after all the system tasks had been spawned. But in stageless the executor is spawned before all the system tasks are spawned. ## Solution - We can catch unwind on each system and close the finish channel if one panics. This then causes the receiver end of the finish channel to panic too. - this might have a small perf impact, but when running many_foxes it seems to be within the noise. So less than 40us. ## Other possible solutions - It might be possible to fairly poll all the tasks in get_results in the scope. If we could do that then the scope could panic whenever one of tasks panics. It would require a data structure that we could both poll the futures through a shared ref and also push to it. I tried FuturesUnordered, but it requires an exclusive ref to poll it. - The catch unwind could be moved onto when we create the tasks for scope instead. We would then need something like a oneshot async channel to inform get_results if a task panics.
This commit is contained in:
		
							parent
							
								
									e1d741aa19
								
							
						
					
					
						commit
						ff7d5ff444
					
				| @ -1,3 +1,5 @@ | |||||||
|  | use std::panic::AssertUnwindSafe; | ||||||
|  | 
 | ||||||
| use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; | use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; | ||||||
| use bevy_utils::default; | use bevy_utils::default; | ||||||
| use bevy_utils::syncunsafecell::SyncUnsafeCell; | use bevy_utils::syncunsafecell::SyncUnsafeCell; | ||||||
| @ -175,11 +177,10 @@ impl SystemExecutor for MultiThreadedExecutor { | |||||||
| 
 | 
 | ||||||
|                         if self.num_running_systems > 0 { |                         if self.num_running_systems > 0 { | ||||||
|                             // wait for systems to complete
 |                             // wait for systems to complete
 | ||||||
|                             let index = self |                             let index = | ||||||
|                                 .receiver |                                 self.receiver.recv().await.expect( | ||||||
|                                 .recv() |                                     "A system has panicked so the executor cannot continue.", | ||||||
|                                 .await |                                 ); | ||||||
|                                 .unwrap_or_else(|error| unreachable!("{}", error)); |  | ||||||
| 
 | 
 | ||||||
|                             self.finish_system_and_signal_dependents(index); |                             self.finish_system_and_signal_dependents(index); | ||||||
| 
 | 
 | ||||||
| @ -429,14 +430,22 @@ impl MultiThreadedExecutor { | |||||||
|         let task = async move { |         let task = async move { | ||||||
|             #[cfg(feature = "trace")] |             #[cfg(feature = "trace")] | ||||||
|             let system_guard = system_span.enter(); |             let system_guard = system_span.enter(); | ||||||
|  |             let res = std::panic::catch_unwind(AssertUnwindSafe(|| { | ||||||
|                 // SAFETY: access is compatible
 |                 // SAFETY: access is compatible
 | ||||||
|                 unsafe { system.run_unsafe((), world) }; |                 unsafe { system.run_unsafe((), world) }; | ||||||
|  |             })); | ||||||
|             #[cfg(feature = "trace")] |             #[cfg(feature = "trace")] | ||||||
|             drop(system_guard); |             drop(system_guard); | ||||||
|  |             if res.is_err() { | ||||||
|  |                 // close the channel to propagate the error to the
 | ||||||
|  |                 // multithreaded executor
 | ||||||
|  |                 sender.close(); | ||||||
|  |             } else { | ||||||
|                 sender |                 sender | ||||||
|                     .send(system_index) |                     .send(system_index) | ||||||
|                     .await |                     .await | ||||||
|                     .unwrap_or_else(|error| unreachable!("{}", error)); |                     .unwrap_or_else(|error| unreachable!("{}", error)); | ||||||
|  |             } | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         #[cfg(feature = "trace")] |         #[cfg(feature = "trace")] | ||||||
| @ -479,13 +488,21 @@ impl MultiThreadedExecutor { | |||||||
|             let task = async move { |             let task = async move { | ||||||
|                 #[cfg(feature = "trace")] |                 #[cfg(feature = "trace")] | ||||||
|                 let system_guard = system_span.enter(); |                 let system_guard = system_span.enter(); | ||||||
|  |                 let res = std::panic::catch_unwind(AssertUnwindSafe(|| { | ||||||
|                     apply_system_buffers(&unapplied_systems, systems, world); |                     apply_system_buffers(&unapplied_systems, systems, world); | ||||||
|  |                 })); | ||||||
|                 #[cfg(feature = "trace")] |                 #[cfg(feature = "trace")] | ||||||
|                 drop(system_guard); |                 drop(system_guard); | ||||||
|  |                 if res.is_err() { | ||||||
|  |                     // close the channel to propagate the error to the
 | ||||||
|  |                     // multithreaded executor
 | ||||||
|  |                     sender.close(); | ||||||
|  |                 } else { | ||||||
|                     sender |                     sender | ||||||
|                         .send(system_index) |                         .send(system_index) | ||||||
|                         .await |                         .await | ||||||
|                         .unwrap_or_else(|error| unreachable!("{}", error)); |                         .unwrap_or_else(|error| unreachable!("{}", error)); | ||||||
|  |                 } | ||||||
|             }; |             }; | ||||||
| 
 | 
 | ||||||
|             #[cfg(feature = "trace")] |             #[cfg(feature = "trace")] | ||||||
| @ -495,13 +512,21 @@ impl MultiThreadedExecutor { | |||||||
|             let task = async move { |             let task = async move { | ||||||
|                 #[cfg(feature = "trace")] |                 #[cfg(feature = "trace")] | ||||||
|                 let system_guard = system_span.enter(); |                 let system_guard = system_span.enter(); | ||||||
|  |                 let res = std::panic::catch_unwind(AssertUnwindSafe(|| { | ||||||
|                     system.run((), world); |                     system.run((), world); | ||||||
|  |                 })); | ||||||
|                 #[cfg(feature = "trace")] |                 #[cfg(feature = "trace")] | ||||||
|                 drop(system_guard); |                 drop(system_guard); | ||||||
|  |                 if res.is_err() { | ||||||
|  |                     // close the channel to propagate the error to the
 | ||||||
|  |                     // multithreaded executor
 | ||||||
|  |                     sender.close(); | ||||||
|  |                 } else { | ||||||
|                     sender |                     sender | ||||||
|                         .send(system_index) |                         .send(system_index) | ||||||
|                         .await |                         .await | ||||||
|                         .unwrap_or_else(|error| unreachable!("{}", error)); |                         .unwrap_or_else(|error| unreachable!("{}", error)); | ||||||
|  |                 } | ||||||
|             }; |             }; | ||||||
| 
 | 
 | ||||||
|             #[cfg(feature = "trace")] |             #[cfg(feature = "trace")] | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 Mike
						Mike