# Objective
The `AssetReader` trait allows customizing the behavior of fetching
bytes for an `AssetPath`, and expects implementors to return `dyn
AsyncRead + AsyncSeek`. This gives implementors of `AssetLoader` great
flexibility to tightly integrate their asset loading behavior with the
asynchronous task system.
However, almost all implementors of `AssetLoader` don't use the async
functionality at all, and just call `AsyncReadExt::read_to_end(&mut
Vec<u8>)`. This is incredibly inefficient, as this method repeatedly
calls `poll_read` on the trait object, filling the vector 32 bytes at a
time. At my work we have assets that are hundreds of megabytes which
makes this a meaningful overhead.
## Solution
Turn the `Reader` type alias into an actual trait, with a provided
method `read_to_end`. This provided method should be more efficient than
the existing extension method, as the compiler will know the underlying
type of `Reader` when generating this function, which removes the
repeated dynamic dispatches and allows the compiler to make further
optimizations after inlining. Individual implementors are able to
override the provided implementation -- for simple asset readers that
just copy bytes from one buffer to another, this allows removing a large
amount of overhead from the provided implementation.
Now that `Reader` is an actual trait, I also improved the ergonomics for
implementing `AssetReader`. Currently, implementors are expected to box
their reader and return it as a trait object, which adds unnecessary
boilerplate to implementations. This PR changes that trait method to
return a pseudo trait alias, which allows implementors to return `impl
Reader` instead of `Box<dyn Reader>`. Now, the boilerplate for boxing
occurs in `ErasedAssetReader`.
## Testing
I made identical changes to my company's fork of bevy. Our app, which
makes heavy use of `read_to_end` for asset loading, still worked
properly after this. I am not aware if we have a more systematic way of
testing asset loading for correctness.
---
## Migration Guide
The trait method `bevy_asset::io::AssetReader::read` (and `read_meta`)
now return an opaque type instead of a boxed trait object. Implementors
of these methods should change the type signatures appropriately
```rust
impl AssetReader for MyReader {
// Before
async fn read<'a>(&'a self, path: &'a Path) -> Result<Box<Reader<'a>>, AssetReaderError> {
let reader = // construct a reader
Box::new(reader) as Box<Reader<'a>>
}
// After
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
// create a reader
}
}
```
`bevy::asset::io::Reader` is now a trait, rather than a type alias for a
trait object. Implementors of `AssetLoader::load` will need to adjust
the method signature accordingly
```rust
impl AssetLoader for MyLoader {
async fn load<'a>(
&'a self,
// Before:
reader: &'a mut bevy::asset::io::Reader,
// After:
reader: &'a mut dyn bevy::asset::io::Reader,
_: &'a Self::Settings,
load_context: &'a mut LoadContext<'_>,
) -> Result<Self::Asset, Self::Error> {
}
```
Additionally, implementors of `AssetReader` that return a type
implementing `futures_io::AsyncRead` and `AsyncSeek` might need to
explicitly implement `bevy::asset::io::Reader` for that type.
```rust
impl bevy::asset::io::Reader for MyAsyncReadAndSeek {}
```
163 lines
5.7 KiB
Rust
163 lines
5.7 KiB
Rust
use crate::{
|
|
io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
|
|
processor::{AssetProcessorData, ProcessStatus},
|
|
AssetPath,
|
|
};
|
|
use async_lock::RwLockReadGuardArc;
|
|
use bevy_utils::tracing::trace;
|
|
use futures_io::{AsyncRead, AsyncSeek};
|
|
use std::io::SeekFrom;
|
|
use std::task::Poll;
|
|
use std::{path::Path, pin::Pin, sync::Arc};
|
|
|
|
use super::ErasedAssetReader;
|
|
|
|
/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
|
|
/// given path until that path has been processed by [`AssetProcessor`].
|
|
///
|
|
/// [`AssetProcessor`]: crate::processor::AssetProcessor
|
|
pub struct ProcessorGatedReader {
|
|
reader: Box<dyn ErasedAssetReader>,
|
|
source: AssetSourceId<'static>,
|
|
processor_data: Arc<AssetProcessorData>,
|
|
}
|
|
|
|
impl ProcessorGatedReader {
|
|
/// Creates a new [`ProcessorGatedReader`].
|
|
pub fn new(
|
|
source: AssetSourceId<'static>,
|
|
reader: Box<dyn ErasedAssetReader>,
|
|
processor_data: Arc<AssetProcessorData>,
|
|
) -> Self {
|
|
Self {
|
|
source,
|
|
processor_data,
|
|
reader,
|
|
}
|
|
}
|
|
|
|
/// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
|
|
/// while it is held.
|
|
async fn get_transaction_lock(
|
|
&self,
|
|
path: &AssetPath<'static>,
|
|
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
|
|
let infos = self.processor_data.asset_infos.read().await;
|
|
let info = infos
|
|
.get(path)
|
|
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
|
|
Ok(info.file_transaction_lock.read_arc().await)
|
|
}
|
|
}
|
|
|
|
impl AssetReader for ProcessorGatedReader {
|
|
async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
|
|
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
|
|
trace!("Waiting for processing to finish before reading {asset_path}");
|
|
let process_result = self
|
|
.processor_data
|
|
.wait_until_processed(asset_path.clone())
|
|
.await;
|
|
match process_result {
|
|
ProcessStatus::Processed => {}
|
|
ProcessStatus::Failed | ProcessStatus::NonExistent => {
|
|
return Err(AssetReaderError::NotFound(path.to_owned()));
|
|
}
|
|
}
|
|
trace!("Processing finished with {asset_path}, reading {process_result:?}",);
|
|
let lock = self.get_transaction_lock(&asset_path).await?;
|
|
let asset_reader = self.reader.read(path).await?;
|
|
let reader = TransactionLockedReader::new(asset_reader, lock);
|
|
Ok(reader)
|
|
}
|
|
|
|
async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
|
|
let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
|
|
trace!("Waiting for processing to finish before reading meta for {asset_path}",);
|
|
let process_result = self
|
|
.processor_data
|
|
.wait_until_processed(asset_path.clone())
|
|
.await;
|
|
match process_result {
|
|
ProcessStatus::Processed => {}
|
|
ProcessStatus::Failed | ProcessStatus::NonExistent => {
|
|
return Err(AssetReaderError::NotFound(path.to_owned()));
|
|
}
|
|
}
|
|
trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
|
|
let lock = self.get_transaction_lock(&asset_path).await?;
|
|
let meta_reader = self.reader.read_meta(path).await?;
|
|
let reader = TransactionLockedReader::new(meta_reader, lock);
|
|
Ok(reader)
|
|
}
|
|
|
|
async fn read_directory<'a>(
|
|
&'a self,
|
|
path: &'a Path,
|
|
) -> Result<Box<PathStream>, AssetReaderError> {
|
|
trace!(
|
|
"Waiting for processing to finish before reading directory {:?}",
|
|
path
|
|
);
|
|
self.processor_data.wait_until_finished().await;
|
|
trace!("Processing finished, reading directory {:?}", path);
|
|
let result = self.reader.read_directory(path).await?;
|
|
Ok(result)
|
|
}
|
|
|
|
async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
|
|
trace!(
|
|
"Waiting for processing to finish before reading directory {:?}",
|
|
path
|
|
);
|
|
self.processor_data.wait_until_finished().await;
|
|
trace!("Processing finished, getting directory status {:?}", path);
|
|
let result = self.reader.is_directory(path).await?;
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.
|
|
pub struct TransactionLockedReader<'a> {
|
|
reader: Box<dyn Reader + 'a>,
|
|
_file_transaction_lock: RwLockReadGuardArc<()>,
|
|
}
|
|
|
|
impl<'a> TransactionLockedReader<'a> {
|
|
fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
|
|
Self {
|
|
reader,
|
|
_file_transaction_lock: file_transaction_lock,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AsyncRead for TransactionLockedReader<'_> {
|
|
fn poll_read(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut std::task::Context<'_>,
|
|
buf: &mut [u8],
|
|
) -> std::task::Poll<futures_io::Result<usize>> {
|
|
Pin::new(&mut self.reader).poll_read(cx, buf)
|
|
}
|
|
}
|
|
|
|
impl AsyncSeek for TransactionLockedReader<'_> {
|
|
fn poll_seek(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut std::task::Context<'_>,
|
|
pos: SeekFrom,
|
|
) -> Poll<std::io::Result<u64>> {
|
|
Pin::new(&mut self.reader).poll_seek(cx, pos)
|
|
}
|
|
}
|
|
|
|
impl Reader for TransactionLockedReader<'_> {
|
|
fn read_to_end<'a>(
|
|
&'a mut self,
|
|
buf: &'a mut Vec<u8>,
|
|
) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
|
|
self.reader.read_to_end(buf)
|
|
}
|
|
}
|