Fix asset_debug_server hang. There should be at most one ThreadExecut… (#7825)

…or's ticker for one thread.

# Objective

- Fix debug_asset_server hang.

## Solution

- Reuse the thread_local executor for MainThreadExecutor resource, so there will be only one ThreadExecutor for main thread. 
- If ThreadTickers from same executor, they are conflict with each other. Then only tick one.
This commit is contained in:
shuo 2023-03-02 08:40:25 +00:00
parent e54103fd69
commit 239b070674
4 changed files with 45 additions and 14 deletions

View File

@ -620,11 +620,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
#[derive(Resource, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
impl Default for MainThreadExecutor {
fn default() -> Self {
Self::new()
}
}
impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
MainThreadExecutor(TaskPool::get_thread_executor())
}
}

View File

@ -56,6 +56,11 @@ impl TaskPoolBuilder {
pub struct TaskPool {}
impl TaskPool {
/// Just create a new `ThreadExecutor` for wasm
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}
/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()

View File

@ -112,7 +112,12 @@ pub struct TaskPool {
impl TaskPool {
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new();
static THREAD_EXECUTOR: Arc<ThreadExecutor<'static>> = Arc::new(ThreadExecutor::new());
}
/// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Self::THREAD_EXECUTOR.with(|executor| executor.clone())
}
/// Create a `TaskPool` with the default configuration.
@ -376,9 +381,17 @@ impl TaskPool {
let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty();
// we get this from a thread local so we should always be on the scope executors thread.
// note: it is possible `scope_executor` and `external_executor` is the same executor,
// in that case, we should only tick one of them, otherwise, it may cause deadlock.
let scope_ticker = scope_executor.ticker().unwrap();
if let Some(external_ticker) = external_executor.ticker() {
if tick_task_pool_executor {
let external_ticker = if !external_executor.is_same(scope_executor) {
external_executor.ticker()
} else {
None
};
match (external_ticker, tick_task_pool_executor) {
(Some(external_ticker), true) => {
Self::execute_global_external_scope(
executor,
external_ticker,
@ -386,14 +399,16 @@ impl TaskPool {
get_results,
)
.await
} else {
}
(Some(external_ticker), false) => {
Self::execute_external_scope(external_ticker, scope_ticker, get_results)
.await
}
} else if tick_task_pool_executor {
Self::execute_global_scope(executor, scope_ticker, get_results).await
} else {
Self::execute_scope(scope_ticker, get_results).await
// either external_executor is none or it is same as scope_executor
(None, true) => {
Self::execute_global_scope(executor, scope_ticker, get_results).await
}
(None, false) => Self::execute_scope(scope_ticker, get_results).await,
}
})
}

View File

@ -77,12 +77,17 @@ impl<'task> ThreadExecutor<'task> {
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
if thread::current().id() == self.thread_id {
return Some(ThreadExecutorTicker {
executor: &self.executor,
executor: self,
_marker: PhantomData::default(),
});
}
None
}
/// Returns true if `self` and `other`'s executor is same
pub fn is_same(&self, other: &Self) -> bool {
std::ptr::eq(self, other)
}
}
/// Used to tick the [`ThreadExecutor`]. The executor does not
@ -90,20 +95,20 @@ impl<'task> ThreadExecutor<'task> {
/// created on.
#[derive(Debug)]
pub struct ThreadExecutorTicker<'task, 'ticker> {
executor: &'ticker Executor<'task>,
executor: &'ticker ThreadExecutor<'task>,
// make type not send or sync
_marker: PhantomData<*const ()>,
}
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
/// Tick the thread executor.
pub async fn tick(&self) {
self.executor.tick().await;
self.executor.executor.tick().await;
}
/// Synchronously try to tick a task on the executor.
/// Returns false if if does not find a task to tick.
pub fn try_tick(&self) -> bool {
self.executor.try_tick()
self.executor.executor.try_tick()
}
}