use crate::{ filesystem_watcher::FilesystemWatcher, AssetLoadError, AssetLoadRequestHandler, AssetLoader, Assets, Handle, HandleId, LoadRequest, }; use anyhow::Result; use crossbeam_channel::TryRecvError; use legion::prelude::{Res, Resources}; use std::{ collections::{HashMap, HashSet}, env, fs, io, path::{Path, PathBuf}, sync::{Arc, RwLock}, thread, }; use thiserror::Error; #[derive(Error, Debug)] pub enum AssetServerError { #[error("Asset folder path is not a directory.")] AssetFolderNotADirectory(String), #[error("Invalid root path")] InvalidRootPath, #[error("No AssetHandler found for the given extension.")] MissingAssetHandler, #[error("No AssetLoader found for the given extension.")] MissingAssetLoader, #[error("Encountered an error while loading an asset.")] AssetLoadError(#[from] AssetLoadError), #[error("Encountered an io error.")] Io(#[from] io::Error), #[error("Failed to watch asset folder.")] AssetFolderWatchError { path: PathBuf }, } struct LoaderThread { // NOTE: these must remain private. the LoaderThread Arc counters are used to determine thread liveness // if there is one reference, the loader thread is dead. if there are two references, the loader thread is active requests: Arc>>, } pub struct AssetServer { asset_folders: Vec, loader_threads: RwLock>, max_loader_threads: usize, asset_handlers: Arc>>>, // TODO: this is a hack to enable retrieving generic AssetLoaders. there must be a better way! loaders: Vec, extension_to_handler_index: HashMap, extension_to_loader_index: HashMap, path_to_handle: RwLock>, #[cfg(feature = "filesystem_watcher")] filesystem_watcher: Option, } impl Default for AssetServer { fn default() -> Self { AssetServer { max_loader_threads: 4, asset_folders: Vec::new(), loader_threads: RwLock::new(Vec::new()), asset_handlers: Arc::new(RwLock::new(Vec::new())), loaders: Vec::new(), extension_to_handler_index: HashMap::new(), extension_to_loader_index: HashMap::new(), path_to_handle: RwLock::new(HashMap::default()), #[cfg(feature = "filesystem_watcher")] filesystem_watcher: None, } } } impl AssetServer { pub fn add_handler(&mut self, asset_handler: T) where T: AssetLoadRequestHandler, { let mut asset_handlers = self.asset_handlers.write().expect("RwLock poisoned"); let handler_index = asset_handlers.len(); for extension in asset_handler.extensions().iter() { self.extension_to_handler_index .insert(extension.to_string(), handler_index); } asset_handlers.push(Box::new(asset_handler)); } pub fn add_loader(&mut self, loader: TLoader) where TLoader: AssetLoader, TAsset: 'static, { let loader_index = self.loaders.len(); for extension in loader.extensions().iter() { self.extension_to_loader_index .insert(extension.to_string(), loader_index); } let mut resources = Resources::default(); resources.insert::>>(Box::new(loader)); self.loaders.push(resources); } pub fn load_asset_folder>(&mut self, path: P) -> Result<(), AssetServerError> { let root_path = self.get_root_path()?; let asset_folder = root_path.join(path); #[cfg(feature = "filesystem_watcher")] Self::watch_folder_for_changes(&mut self.filesystem_watcher, &asset_folder)?; self.load_assets_in_folder_recursive(&asset_folder)?; self.asset_folders.push(asset_folder); Ok(()) } pub fn get_handle>(&self, path: P) -> Option> { self.path_to_handle .read() .expect("RwLock poisoned") .get(path.as_ref()) .map(|h| Handle::from(*h)) } #[cfg(feature = "filesystem_watcher")] fn watch_folder_for_changes>( filesystem_watcher: &mut Option, path: P, ) -> Result<(), AssetServerError> { if let Some(watcher) = filesystem_watcher { watcher.watch_folder(&path).map_err(|_error| { AssetServerError::AssetFolderWatchError { path: path.as_ref().to_owned(), } })?; } Ok(()) } #[cfg(feature = "filesystem_watcher")] pub fn watch_for_changes(&mut self) -> Result<(), AssetServerError> { let _ = self .filesystem_watcher .get_or_insert_with(|| FilesystemWatcher::default()); for asset_folder in self.asset_folders.iter() { Self::watch_folder_for_changes(&mut self.filesystem_watcher, asset_folder)?; } Ok(()) } #[cfg(feature = "filesystem_watcher")] pub fn filesystem_watcher_system(asset_server: Res) { use notify::event::{Event, EventKind, ModifyKind}; let mut changed = HashSet::new(); loop { if let Some(ref filesystem_watcher) = asset_server.filesystem_watcher { match filesystem_watcher.receiver.try_recv() { Ok(result) => { let event = result.unwrap(); match event { Event { kind: EventKind::Modify(ModifyKind::Data(_)), paths, .. } => { for path in paths.iter() { if !changed.contains(path) { let root_path = asset_server.get_root_path().unwrap(); let relative_path = path.strip_prefix(root_path).unwrap(); match asset_server.load_untyped(relative_path) { Ok(_) => {} Err(AssetServerError::AssetLoadError(error)) => { panic!("{:?}", error) } Err(_) => {} } } } changed.extend(paths); } _ => {} } } Err(TryRecvError::Empty) => { break; } Err(TryRecvError::Disconnected) => panic!("FilesystemWatcher disconnected"), } } else { break; } } } fn get_root_path(&self) -> Result { if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") { Ok(PathBuf::from(manifest_dir)) } else { match env::current_exe() { Ok(exe_path) => exe_path .parent() .ok_or(AssetServerError::InvalidRootPath) .map(|exe_parent_path| exe_parent_path.to_owned()), Err(err) => Err(AssetServerError::Io(err)), } } } // TODO: add type checking here. people shouldn't be able to request a Handle for a Mesh asset pub fn load>(&self, path: P) -> Result, AssetServerError> { self.load_untyped(path) .map(|handle_id| Handle::from(handle_id)) } pub fn load_sync>( &self, assets: &mut Assets, path: P, ) -> Result, AssetServerError> where T: 'static, { let path = path.as_ref(); if let Some(ref extension) = path.extension() { if let Some(index) = self.extension_to_loader_index.get( extension .to_str() .expect("extension should be a valid string"), ) { let handle_id = HandleId::new(); let resources = &self.loaders[*index]; let loader = resources.get::>>().unwrap(); let asset = loader.load_from_file(path)?; let handle = Handle::from(handle_id); assets.set(handle, asset); assets.set_path(handle, path); Ok(handle) } else { Err(AssetServerError::MissingAssetHandler) } } else { Err(AssetServerError::MissingAssetHandler) } } pub fn load_untyped>(&self, path: P) -> Result { let path = path.as_ref(); if let Some(ref extension) = path.extension() { if let Some(index) = self.extension_to_handler_index.get( extension .to_str() .expect("Extension should be a valid string."), ) { let handle_id = { let mut path_to_handle = self.path_to_handle.write().expect("RwLock poisoned"); if let Some(handle_id) = path_to_handle.get(path) { *handle_id } else { let handle_id = HandleId::new(); path_to_handle.insert(path.to_owned(), handle_id.clone()); handle_id } }; self.send_request_to_loader_thread(LoadRequest { handle_id, path: path.to_owned(), handler_index: *index, }); Ok(handle_id) } else { Err(AssetServerError::MissingAssetHandler) } } else { Err(AssetServerError::MissingAssetHandler) } } fn send_request_to_loader_thread(&self, load_request: LoadRequest) { let mut loader_threads = self.loader_threads.write().expect("RwLock poisoned"); if loader_threads.len() < self.max_loader_threads { let loader_thread = LoaderThread { requests: Arc::new(RwLock::new(vec![load_request])), }; let requests = loader_thread.requests.clone(); loader_threads.push(loader_thread); Self::start_thread(self.asset_handlers.clone(), requests); } else { let most_free_thread = loader_threads .iter() .min_by_key(|l| l.requests.read().expect("RwLock poisoned").len()) .unwrap(); let mut requests = most_free_thread.requests.write().expect("RwLock poisoned"); requests.push(load_request); // if most free thread only has one reference, the thread as spun down. if so, we need to spin it back up! if Arc::strong_count(&most_free_thread.requests) == 1 { Self::start_thread( self.asset_handlers.clone(), most_free_thread.requests.clone(), ); } } } fn start_thread( request_handlers: Arc>>>, requests: Arc>>, ) { thread::spawn(move || { loop { let request = { let mut current_requests = requests.write().expect("RwLock poisoned"); if current_requests.len() == 0 { // if there are no requests, spin down the thread break; } current_requests.pop().unwrap() }; let handlers = request_handlers.read().expect("RwLock poisoned"); let request_handler = &handlers[request.handler_index]; request_handler.handle_request(&request); } }); } fn load_assets_in_folder_recursive(&self, path: &Path) -> Result<(), AssetServerError> { if !path.is_dir() { return Err(AssetServerError::AssetFolderNotADirectory( path.to_str().unwrap().to_string(), )); } let root_path = self.get_root_path()?; for entry in fs::read_dir(path)? { let entry = entry?; let child_path = entry.path(); if !child_path.is_dir() { let relative_child_path = child_path.strip_prefix(&root_path).unwrap(); let _ = self.load_untyped(relative_child_path.to_str().expect("Path should be a valid string")); } else { self.load_assets_in_folder_recursive(&child_path)?; } } Ok(()) } }