tick local executor (#6121)
# Objective - #4466 broke local tasks running. - Fixes https://github.com/bevyengine/bevy/issues/6120 ## Solution - Add system for ticking local executors on main thread into bevy_core where the tasks pools are initialized. - Add ticking local executors into thread executors ## Changelog - tick all thread local executors in task pool. ## Notes - ~~Not 100% sure about this PR. Ticking the local executor for the main thread in scope feels a little kludgy as it requires users of bevy_tasks to be calling scope periodically for those tasks to make progress.~~ took this out in favor of a system that ticks the local executors.
This commit is contained in:
		
							parent
							
								
									64a8485a11
								
							
						
					
					
						commit
						0f3f628c48
					
				@ -20,3 +20,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" }
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# other
 | 
					# other
 | 
				
			||||||
bytemuck = "1.5"
 | 
					bytemuck = "1.5"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[dev-dependencies]
 | 
				
			||||||
 | 
					crossbeam-channel = "0.5.0"
 | 
				
			||||||
 | 
				
			|||||||
@ -22,6 +22,11 @@ use bevy_utils::{Duration, HashSet, Instant};
 | 
				
			|||||||
use std::borrow::Cow;
 | 
					use std::borrow::Cow;
 | 
				
			||||||
use std::ops::Range;
 | 
					use std::ops::Range;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(not(target_arch = "wasm32"))]
 | 
				
			||||||
 | 
					use bevy_ecs::schedule::IntoSystemDescriptor;
 | 
				
			||||||
 | 
					#[cfg(not(target_arch = "wasm32"))]
 | 
				
			||||||
 | 
					use bevy_tasks::tick_global_task_pools_on_main_thread;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// Adds core functionality to Apps.
 | 
					/// Adds core functionality to Apps.
 | 
				
			||||||
#[derive(Default)]
 | 
					#[derive(Default)]
 | 
				
			||||||
pub struct CorePlugin;
 | 
					pub struct CorePlugin;
 | 
				
			||||||
@ -35,6 +40,13 @@ impl Plugin for CorePlugin {
 | 
				
			|||||||
            .unwrap_or_default()
 | 
					            .unwrap_or_default()
 | 
				
			||||||
            .create_default_pools();
 | 
					            .create_default_pools();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        #[cfg(not(target_arch = "wasm32"))]
 | 
				
			||||||
 | 
					        app.add_system_to_stage(
 | 
				
			||||||
 | 
					            bevy_app::CoreStage::Last,
 | 
				
			||||||
 | 
					            tick_global_task_pools_on_main_thread.at_end(),
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        app.register_type::<Entity>().register_type::<Name>();
 | 
				
			||||||
        app.register_type::<Entity>()
 | 
					        app.register_type::<Entity>()
 | 
				
			||||||
            .register_type::<Name>()
 | 
					            .register_type::<Name>()
 | 
				
			||||||
            .register_type::<Range<f32>>()
 | 
					            .register_type::<Range<f32>>()
 | 
				
			||||||
@ -97,3 +109,42 @@ fn register_math_types(app: &mut App) {
 | 
				
			|||||||
/// Wraps to 0 when it reaches the maximum u32 value
 | 
					/// Wraps to 0 when it reaches the maximum u32 value
 | 
				
			||||||
#[derive(Default, Resource, Clone, Copy)]
 | 
					#[derive(Default, Resource, Clone, Copy)]
 | 
				
			||||||
pub struct FrameCount(pub u32);
 | 
					pub struct FrameCount(pub u32);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[cfg(test)]
 | 
				
			||||||
 | 
					mod tests {
 | 
				
			||||||
 | 
					    use super::*;
 | 
				
			||||||
 | 
					    use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #[test]
 | 
				
			||||||
 | 
					    fn runs_spawn_local_tasks() {
 | 
				
			||||||
 | 
					        let mut app = App::new();
 | 
				
			||||||
 | 
					        app.add_plugin(CorePlugin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let (async_tx, async_rx) = crossbeam_channel::unbounded();
 | 
				
			||||||
 | 
					        AsyncComputeTaskPool::get()
 | 
				
			||||||
 | 
					            .spawn_local(async move {
 | 
				
			||||||
 | 
					                async_tx.send(()).unwrap();
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					            .detach();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let (compute_tx, compute_rx) = crossbeam_channel::unbounded();
 | 
				
			||||||
 | 
					        ComputeTaskPool::get()
 | 
				
			||||||
 | 
					            .spawn_local(async move {
 | 
				
			||||||
 | 
					                compute_tx.send(()).unwrap();
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					            .detach();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let (io_tx, io_rx) = crossbeam_channel::unbounded();
 | 
				
			||||||
 | 
					        IoTaskPool::get()
 | 
				
			||||||
 | 
					            .spawn_local(async move {
 | 
				
			||||||
 | 
					                io_tx.send(()).unwrap();
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
 | 
					            .detach();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        app.run();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        async_rx.try_recv().unwrap();
 | 
				
			||||||
 | 
					        compute_rx.try_recv().unwrap();
 | 
				
			||||||
 | 
					        io_rx.try_recv().unwrap();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -18,6 +18,8 @@ mod single_threaded_task_pool;
 | 
				
			|||||||
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};
 | 
					pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
mod usages;
 | 
					mod usages;
 | 
				
			||||||
 | 
					#[cfg(not(target_arch = "wasm32"))]
 | 
				
			||||||
 | 
					pub use usages::tick_global_task_pools_on_main_thread;
 | 
				
			||||||
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
 | 
					pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
mod iter;
 | 
					mod iter;
 | 
				
			||||||
 | 
				
			|||||||
@ -8,7 +8,7 @@ use std::{
 | 
				
			|||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use concurrent_queue::ConcurrentQueue;
 | 
					use concurrent_queue::ConcurrentQueue;
 | 
				
			||||||
use futures_lite::{future, pin};
 | 
					use futures_lite::{future, pin, FutureExt};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::Task;
 | 
					use crate::Task;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -117,9 +117,16 @@ impl TaskPool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                thread_builder
 | 
					                thread_builder
 | 
				
			||||||
                    .spawn(move || {
 | 
					                    .spawn(move || {
 | 
				
			||||||
                        let shutdown_future = ex.run(shutdown_rx.recv());
 | 
					                        TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
 | 
				
			||||||
                        // Use unwrap_err because we expect a Closed error
 | 
					                            let tick_forever = async move {
 | 
				
			||||||
                        future::block_on(shutdown_future).unwrap_err();
 | 
					                                loop {
 | 
				
			||||||
 | 
					                                    local_executor.tick().await;
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            };
 | 
				
			||||||
 | 
					                            let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
 | 
				
			||||||
 | 
					                            // Use unwrap_err because we expect a Closed error
 | 
				
			||||||
 | 
					                            future::block_on(shutdown_future).unwrap_err();
 | 
				
			||||||
 | 
					                        });
 | 
				
			||||||
                    })
 | 
					                    })
 | 
				
			||||||
                    .expect("Failed to spawn thread.")
 | 
					                    .expect("Failed to spawn thread.")
 | 
				
			||||||
            })
 | 
					            })
 | 
				
			||||||
@ -314,6 +321,24 @@ impl TaskPool {
 | 
				
			|||||||
    {
 | 
					    {
 | 
				
			||||||
        Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
 | 
					        Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Runs a function with the local executor. Typically used to tick
 | 
				
			||||||
 | 
					    /// the local executor on the main thread as it needs to share time with
 | 
				
			||||||
 | 
					    /// other things.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// ```rust
 | 
				
			||||||
 | 
					    /// use bevy_tasks::TaskPool;
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// TaskPool::new().with_local_executor(|local_executor| {
 | 
				
			||||||
 | 
					    ///     local_executor.try_tick();
 | 
				
			||||||
 | 
					    /// });
 | 
				
			||||||
 | 
					    /// ```
 | 
				
			||||||
 | 
					    pub fn with_local_executor<F, R>(&self, f: F) -> R
 | 
				
			||||||
 | 
					    where
 | 
				
			||||||
 | 
					        F: FnOnce(&async_executor::LocalExecutor) -> R,
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        Self::LOCAL_EXECUTOR.with(f)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Default for TaskPool {
 | 
					impl Default for TaskPool {
 | 
				
			||||||
 | 
				
			|||||||
@ -109,3 +109,29 @@ impl Deref for IoTaskPool {
 | 
				
			|||||||
        &self.0
 | 
					        &self.0
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// Used by `bevy_core` to tick the global tasks pools on the main thread.
 | 
				
			||||||
 | 
					/// This will run a maximum of 100 local tasks per executor per call to this function.
 | 
				
			||||||
 | 
					#[cfg(not(target_arch = "wasm32"))]
 | 
				
			||||||
 | 
					pub fn tick_global_task_pools_on_main_thread() {
 | 
				
			||||||
 | 
					    COMPUTE_TASK_POOL
 | 
				
			||||||
 | 
					        .get()
 | 
				
			||||||
 | 
					        .unwrap()
 | 
				
			||||||
 | 
					        .with_local_executor(|compute_local_executor| {
 | 
				
			||||||
 | 
					            ASYNC_COMPUTE_TASK_POOL
 | 
				
			||||||
 | 
					                .get()
 | 
				
			||||||
 | 
					                .unwrap()
 | 
				
			||||||
 | 
					                .with_local_executor(|async_local_executor| {
 | 
				
			||||||
 | 
					                    IO_TASK_POOL
 | 
				
			||||||
 | 
					                        .get()
 | 
				
			||||||
 | 
					                        .unwrap()
 | 
				
			||||||
 | 
					                        .with_local_executor(|io_local_executor| {
 | 
				
			||||||
 | 
					                            for _ in 0..100 {
 | 
				
			||||||
 | 
					                                compute_local_executor.try_tick();
 | 
				
			||||||
 | 
					                                async_local_executor.try_tick();
 | 
				
			||||||
 | 
					                                io_local_executor.try_tick();
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					                        });
 | 
				
			||||||
 | 
					                });
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user