diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 42fda6dd2d..c33e887a8e 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -8,7 +8,7 @@ use alloc::sync::{Arc, Weak}; use bevy_ecs::world::World; use bevy_tasks::Task; use bevy_utils::{tracing::warn, Entry, HashMap, HashSet, TypeIdMap}; -use core::any::TypeId; +use core::{any::TypeId, task::Waker}; use crossbeam_channel::Sender; use derive_more::derive::{Display, Error, From}; use either::Either; @@ -36,6 +36,8 @@ pub(crate) struct AssetInfo { /// The number of handle drops to skip for this asset. /// See usage (and comments) in `get_or_create_path_handle` for context. handle_drops_to_skip: usize, + /// List of tasks waiting for this asset to complete loading + pub(crate) waiting_tasks: Vec, } impl AssetInfo { @@ -54,6 +56,7 @@ impl AssetInfo { dependants_waiting_on_load: HashSet::default(), dependants_waiting_on_recursive_dep_load: HashSet::default(), handle_drops_to_skip: 0, + waiting_tasks: Vec::new(), } } } @@ -616,6 +619,9 @@ impl AssetInfos { info.load_state = LoadState::Failed(error.clone()); info.dep_load_state = DependencyLoadState::Failed(error.clone()); info.rec_dep_load_state = RecursiveDependencyLoadState::Failed(error.clone()); + for waker in info.waiting_tasks.drain(..) { + waker.wake(); + } ( core::mem::take(&mut info.dependants_waiting_on_load), core::mem::take(&mut info.dependants_waiting_on_recursive_dep_load), diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index e03f10b37e..0a48f84e65 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -25,7 +25,7 @@ use bevy_utils::{ tracing::{error, info}, HashSet, }; -use core::{any::TypeId, future::Future, panic::AssertUnwindSafe}; +use core::{any::TypeId, future::Future, panic::AssertUnwindSafe, task::Poll}; use crossbeam_channel::{Receiver, Sender}; use derive_more::derive::{Display, Error, From}; use either::Either; @@ -413,7 +413,7 @@ impl AssetServer { &self, handle: UntypedHandle, path: AssetPath<'static>, - mut infos: RwLockWriteGuard, + infos: RwLockWriteGuard, guard: G, ) { // drop the lock on `AssetInfos` before spawning a task that may block on it in single-threaded @@ -433,7 +433,10 @@ impl AssetServer { }); #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] - infos.pending_tasks.insert(handle.id(), task); + { + let mut infos = infos; + infos.pending_tasks.insert(handle.id(), task); + } #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] task.detach(); @@ -1336,6 +1339,132 @@ impl AssetServer { }) }) } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset( + &self, + // NOTE: We take a reference to a handle so we know it will outlive the future, + // which ensures the handle won't be dropped while waiting for the asset. + handle: &Handle, + ) -> Result<(), WaitForAssetError> { + self.wait_for_asset_id(handle.id().untyped()).await + } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset_untyped( + &self, + // NOTE: We take a reference to a handle so we know it will outlive the future, + // which ensures the handle won't be dropped while waiting for the asset. + handle: &UntypedHandle, + ) -> Result<(), WaitForAssetError> { + self.wait_for_asset_id(handle.id()).await + } + + /// Returns a future that will suspend until the specified asset and its dependencies finish + /// loading. + /// + /// Note that since an asset ID does not count as a reference to the asset, + /// the future returned from this method will *not* keep the asset alive. + /// This may lead to the asset unexpectedly being dropped while you are waiting for it to + /// finish loading. + /// + /// When calling this method, make sure a strong handle is stored elsewhere to prevent the + /// asset from being dropped. + /// If you have access to an asset's strong [`Handle`], you should prefer to call + /// [`AssetServer::wait_for_asset`] + /// or [`wait_for_assest_untyped`](Self::wait_for_asset_untyped) to ensure the asset finishes + /// loading. + /// + /// # Errors + /// + /// This will return an error if the asset or any of its dependencies fail to load, + /// or if the asset has not been queued up to be loaded. + pub async fn wait_for_asset_id( + &self, + id: impl Into, + ) -> Result<(), WaitForAssetError> { + let id = id.into(); + core::future::poll_fn(move |cx| self.wait_for_asset_id_poll_fn(cx, id)).await + } + + /// Used by [`wait_for_asset_id`](AssetServer::wait_for_asset_id) in [`poll_fn`](core::future::poll_fn). + fn wait_for_asset_id_poll_fn( + &self, + cx: &mut core::task::Context<'_>, + id: UntypedAssetId, + ) -> Poll> { + let infos = self.data.infos.read(); + + let Some(info) = infos.get(id) else { + return Poll::Ready(Err(WaitForAssetError::NotLoaded)); + }; + + match (&info.load_state, &info.rec_dep_load_state) { + (LoadState::Loaded, RecursiveDependencyLoadState::Loaded) => Poll::Ready(Ok(())), + // Return an error immediately if the asset is not in the process of loading + (LoadState::NotLoaded, _) => Poll::Ready(Err(WaitForAssetError::NotLoaded)), + // If the asset is loading, leave our waker behind + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) => { + // Check if our waker is already there + let has_waker = info + .waiting_tasks + .iter() + .any(|waker| waker.will_wake(cx.waker())); + + if has_waker { + return Poll::Pending; + } + + let mut infos = { + // Must drop read-only guard to acquire write guard + drop(infos); + self.data.infos.write() + }; + + let Some(info) = infos.get_mut(id) else { + return Poll::Ready(Err(WaitForAssetError::NotLoaded)); + }; + + // If the load state changed while reacquiring the lock, immediately + // reawaken the task + let is_loading = matches!( + (&info.load_state, &info.rec_dep_load_state), + (LoadState::Loading, _) + | (_, RecursiveDependencyLoadState::Loading) + | (LoadState::Loaded, RecursiveDependencyLoadState::NotLoaded) + ); + + if !is_loading { + cx.waker().wake_by_ref(); + } else { + // Leave our waker behind + info.waiting_tasks.push(cx.waker().clone()); + } + + Poll::Pending + } + (LoadState::Failed(error), _) => { + Poll::Ready(Err(WaitForAssetError::Failed(error.clone()))) + } + (_, RecursiveDependencyLoadState::Failed(error)) => { + Poll::Ready(Err(WaitForAssetError::DependencyFailed(error.clone()))) + } + } + } } /// A system that manages internal [`AssetServer`] events, such as finalizing asset loads. @@ -1359,6 +1488,11 @@ pub fn handle_internal_asset_events(world: &mut World) { .get(&id.type_id()) .expect("Asset event sender should exist"); sender(world, id); + if let Some(info) = infos.get_mut(id) { + for waker in info.waiting_tasks.drain(..) { + waker.wake(); + } + } } InternalAssetEvent::Failed { id, path, error } => { infos.process_asset_fail(id, error.clone()); @@ -1710,3 +1844,12 @@ impl core::fmt::Debug for AssetServer { /// This is appended to asset sources when loading a [`LoadedUntypedAsset`]. This provides a unique /// source for a given [`AssetPath`]. const UNTYPED_SOURCE_SUFFIX: &str = "--untyped"; + +/// An error when attempting to wait asynchronously for an [`Asset`] to load. +#[derive(Error, Debug, Clone, Display)] +pub enum WaitForAssetError { + #[display("tried to wait for an asset that is not being loaded")] + NotLoaded, + Failed(Arc), + DependencyFailed(Arc), +}