bevy/crates/bevy_asset/src/loader.rs
James Liu 012ae07dc8 Add global init and get accessors for all newtyped TaskPools (#2250)
Right now, a direct reference to the target TaskPool is required to launch tasks on the pools, despite the three newtyped pools (AsyncComputeTaskPool, ComputeTaskPool, and IoTaskPool) effectively acting as global instances. The need to pass a TaskPool reference adds notable friction to spawning subtasks within existing tasks. Possible use cases for this may include chaining tasks within the same pool like spawning separate send/receive I/O tasks after waiting on a network connection to be established, or allowing cross-pool dependent tasks like starting dependent multi-frame computations following a long I/O load. 

Other task execution runtimes provide static access to spawning tasks (i.e. `tokio::spawn`), which is notably easier to use than the reference passing required by `bevy_tasks` right now.

This PR makes does the following:

 * Adds `*TaskPool::init` which initializes a `OnceCell`'ed with a provided TaskPool. Failing if the pool has already been initialized.
 * Adds `*TaskPool::get` which fetches the initialized global pool of the respective type or panics. This generally should not be an issue in normal Bevy use, as the pools are initialized before they are accessed.
 * Updated default task pool initialization to either pull the global handles and save them as resources, or if they are already initialized, pull the a cloned global handle as the resource.

This should make it notably easier to build more complex task hierarchies for dependent tasks. It should also make writing bevy-adjacent, but not strictly bevy-only plugin crates easier, as the global pools ensure it's all running on the same threads.

One alternative considered is keeping a thread-local reference to the pool for all threads in each pool to enable the same `tokio::spawn` interface. This would spawn tasks on the same pool that a task is currently running in. However this potentially leads to potential footgun situations where long running blocking tasks run on `ComputeTaskPool`.
2022-06-09 02:43:24 +00:00

211 lines
6.0 KiB
Rust

use crate::{
path::AssetPath, AssetIo, AssetIoError, AssetMeta, AssetServer, Assets, Handle, HandleId,
RefChangeChannel,
};
use anyhow::Result;
use bevy_ecs::system::{Res, ResMut};
use bevy_reflect::{TypeUuid, TypeUuidDynamic};
use bevy_utils::{BoxedFuture, HashMap};
use crossbeam_channel::{Receiver, Sender};
use downcast_rs::{impl_downcast, Downcast};
use std::path::Path;
/// A loader for an asset source
pub trait AssetLoader: Send + Sync + 'static {
fn load<'a>(
&'a self,
bytes: &'a [u8],
load_context: &'a mut LoadContext,
) -> BoxedFuture<'a, Result<(), anyhow::Error>>;
fn extensions(&self) -> &[&str];
}
pub trait Asset: TypeUuid + AssetDynamic {}
pub trait AssetDynamic: Downcast + TypeUuidDynamic + Send + Sync + 'static {}
impl_downcast!(AssetDynamic);
impl<T> Asset for T where T: TypeUuid + AssetDynamic + TypeUuidDynamic {}
impl<T> AssetDynamic for T where T: Send + Sync + 'static + TypeUuidDynamic {}
pub struct LoadedAsset<T: Asset> {
pub(crate) value: Option<T>,
pub(crate) dependencies: Vec<AssetPath<'static>>,
}
impl<T: Asset> LoadedAsset<T> {
pub fn new(value: T) -> Self {
Self {
value: Some(value),
dependencies: Vec::new(),
}
}
pub fn add_dependency(&mut self, asset_path: AssetPath) {
self.dependencies.push(asset_path.to_owned());
}
#[must_use]
pub fn with_dependency(mut self, asset_path: AssetPath) -> Self {
self.add_dependency(asset_path);
self
}
#[must_use]
pub fn with_dependencies(mut self, mut asset_paths: Vec<AssetPath<'static>>) -> Self {
for asset_path in asset_paths.drain(..) {
self.add_dependency(asset_path);
}
self
}
}
pub(crate) struct BoxedLoadedAsset {
pub(crate) value: Option<Box<dyn AssetDynamic>>,
pub(crate) dependencies: Vec<AssetPath<'static>>,
}
impl<T: Asset> From<LoadedAsset<T>> for BoxedLoadedAsset {
fn from(asset: LoadedAsset<T>) -> Self {
BoxedLoadedAsset {
value: asset
.value
.map(|value| Box::new(value) as Box<dyn AssetDynamic>),
dependencies: asset.dependencies,
}
}
}
pub struct LoadContext<'a> {
pub(crate) ref_change_channel: &'a RefChangeChannel,
pub(crate) asset_io: &'a dyn AssetIo,
pub(crate) labeled_assets: HashMap<Option<String>, BoxedLoadedAsset>,
pub(crate) path: &'a Path,
pub(crate) version: usize,
}
impl<'a> LoadContext<'a> {
pub(crate) fn new(
path: &'a Path,
ref_change_channel: &'a RefChangeChannel,
asset_io: &'a dyn AssetIo,
version: usize,
) -> Self {
Self {
ref_change_channel,
asset_io,
labeled_assets: Default::default(),
version,
path,
}
}
pub fn path(&self) -> &Path {
self.path
}
pub fn has_labeled_asset(&self, label: &str) -> bool {
self.labeled_assets.contains_key(&Some(label.to_string()))
}
pub fn set_default_asset<T: Asset>(&mut self, asset: LoadedAsset<T>) {
self.labeled_assets.insert(None, asset.into());
}
pub fn set_labeled_asset<T: Asset>(&mut self, label: &str, asset: LoadedAsset<T>) -> Handle<T> {
assert!(!label.is_empty());
self.labeled_assets
.insert(Some(label.to_string()), asset.into());
self.get_handle(AssetPath::new_ref(self.path(), Some(label)))
}
pub fn get_handle<I: Into<HandleId>, T: Asset>(&self, id: I) -> Handle<T> {
Handle::strong(id.into(), self.ref_change_channel.sender.clone())
}
pub async fn read_asset_bytes<P: AsRef<Path>>(&self, path: P) -> Result<Vec<u8>, AssetIoError> {
self.asset_io.load_path(path.as_ref()).await
}
pub fn get_asset_metas(&self) -> Vec<AssetMeta> {
let mut asset_metas = Vec::new();
for (label, asset) in &self.labeled_assets {
asset_metas.push(AssetMeta {
dependencies: asset.dependencies.clone(),
label: label.clone(),
type_uuid: asset.value.as_ref().unwrap().type_uuid(),
});
}
asset_metas
}
pub fn asset_io(&self) -> &dyn AssetIo {
self.asset_io
}
}
/// The result of loading an asset of type `T`
#[derive(Debug)]
pub struct AssetResult<T> {
pub asset: Box<T>,
pub id: HandleId,
pub version: usize,
}
/// A channel to send and receive [`AssetResult`]s
#[derive(Debug)]
pub struct AssetLifecycleChannel<T> {
pub sender: Sender<AssetLifecycleEvent<T>>,
pub receiver: Receiver<AssetLifecycleEvent<T>>,
}
pub enum AssetLifecycleEvent<T> {
Create(AssetResult<T>),
Free(HandleId),
}
pub trait AssetLifecycle: Downcast + Send + Sync + 'static {
fn create_asset(&self, id: HandleId, asset: Box<dyn AssetDynamic>, version: usize);
fn free_asset(&self, id: HandleId);
}
impl_downcast!(AssetLifecycle);
impl<T: AssetDynamic> AssetLifecycle for AssetLifecycleChannel<T> {
fn create_asset(&self, id: HandleId, asset: Box<dyn AssetDynamic>, version: usize) {
if let Ok(asset) = asset.downcast::<T>() {
self.sender
.send(AssetLifecycleEvent::Create(AssetResult {
asset,
id,
version,
}))
.unwrap();
} else {
panic!(
"Failed to downcast asset to {}.",
std::any::type_name::<T>()
);
}
}
fn free_asset(&self, id: HandleId) {
self.sender.send(AssetLifecycleEvent::Free(id)).unwrap();
}
}
impl<T> Default for AssetLifecycleChannel<T> {
fn default() -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
AssetLifecycleChannel { sender, receiver }
}
}
/// Updates the [`Assets`] collection according to the changes queued up by [`AssetServer`].
pub fn update_asset_storage_system<T: Asset + AssetDynamic>(
asset_server: Res<AssetServer>,
assets: ResMut<Assets<T>>,
) {
asset_server.update_asset_storage(assets);
}