Task System for Bevy (#384)

Add bevy_tasks crate to replace rayon
This commit is contained in:
Lachlan Sneff 2020-08-29 15:35:41 -04:00 committed by GitHub
parent db8ec7d55f
commit 17e7642611
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 847 additions and 67 deletions

View File

@ -59,6 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" }
bevy_ui = { path = "crates/bevy_ui", version = "0.1" }
bevy_utils = { path = "crates/bevy_utils", version = "0.1" }
bevy_window = { path = "crates/bevy_window", version = "0.1" }
bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" }
# bevy (optional)
bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" }

View File

@ -13,8 +13,10 @@ keywords = ["bevy"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_math = { path = "../bevy_math", version = "0.1" }
# other
libloading = "0.6"
log = { version = "0.4", features = ["release_max_level_info"] }
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"]}

View File

@ -1,4 +1,4 @@
use crate::app_builder::AppBuilder;
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};
#[allow(clippy::needless_doctest_main)]
@ -63,6 +63,12 @@ impl App {
}
pub fn run(mut self) {
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);
self.startup_schedule.initialize(&mut self.resources);
self.startup_executor.run(
&mut self.startup_schedule,

View File

@ -8,6 +8,7 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;
pub use app::*;
pub use app_builder::*;
@ -15,6 +16,7 @@ pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;
pub mod prelude {
pub use crate::{

View File

@ -0,0 +1,147 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};
/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
/// Under no circumstance use more than this many threads for this pool
pub max_threads: usize,
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
}
impl TaskPoolThreadAssignmentPolicy {
/// Determine the number of threads to use for this task pool
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
assert!(self.percent >= 0.0);
let mut desired = (total_threads as f32 * self.percent).round() as usize;
// Limit ourselves to the number of cores available
desired = desired.min(remaining_threads);
// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
bevy_math::clamp(desired, self.min_threads, self.max_threads)
}
}
/// Helper for configuring and creating the default task pools. For end-users who want full control,
/// insert the default task pools into the resource map manually. If the pools are already inserted,
/// this helper will do nothing.
#[derive(Clone)]
pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
pub min_total_threads: usize,
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
pub max_total_threads: usize,
/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
}
impl Default for DefaultTaskPoolOptions {
fn default() -> Self {
DefaultTaskPoolOptions {
// By default, use however many cores are available on the system
min_total_threads: 1,
max_total_threads: std::usize::MAX,
// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},
// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},
// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
}
}
}
impl DefaultTaskPoolOptions {
/// Create a configuration that forces using the given number of threads.
pub fn with_num_threads(thread_count: usize) -> Self {
let mut options = Self::default();
options.min_total_threads = thread_count;
options.max_total_threads = thread_count;
options
}
/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = bevy_math::clamp(
bevy_tasks::logical_core_count(),
self.min_total_threads,
self.max_total_threads,
);
let mut remaining_threads = total_threads;
if !resources.contains::<IOTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= io_threads;
resources.insert(IOTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}
if !resources.contains::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= async_compute_threads;
resources.insert(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}
if !resources.contains::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);
resources.insert(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
}
}
}

View File

@ -15,9 +15,9 @@ profiler = []
[dependencies]
bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_utils = { path = "../bevy_utils", version = "0.1" }
rand = "0.7.2"
rayon = "1.3"
crossbeam-channel = "0.4.2"
fixedbitset = "0.3.0"
downcast-rs = "1.1.1"

View File

@ -43,6 +43,12 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}
/// Returns a clone of the underlying resource, this is helpful when borrowing something
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
self.get::<T>().map(|r| (*r).clone())
}
#[allow(clippy::needless_lifetimes)]
pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option<Ref<'a, T>> {
self.get_resource(ResourceIndex::System(id))

View File

@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World};
use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use rayon::ScopeFifo;
use std::{ops::Range, sync::Arc};
/// Executes each schedule stage in parallel by analyzing system dependencies.
@ -66,52 +65,6 @@ impl ParallelExecutor {
}
}
/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs.
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
#[derive(Debug, Default, Clone)]
pub struct ParallelExecutorOptions {
/// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`.
num_threads: Option<usize>,
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`.
stack_size: Option<usize>,
// TODO: Do we also need/want to expose other features (*_handler, etc.)
}
impl ParallelExecutorOptions {
/// Creates a new ParallelExecutorOptions instance
pub fn new() -> Self {
Self::default()
}
/// Sets the num_threads option, using the builder pattern
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
self.num_threads = num_threads;
self
}
/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
/// otherwise your application may run into stability and performance issues.
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
self.stack_size = stack_size;
self
}
/// Creates a new ThreadPoolBuilder based on the current options.
pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder {
let mut builder = rayon::ThreadPoolBuilder::new();
if let Some(num_threads) = self.num_threads {
builder = builder.num_threads(num_threads);
}
if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}
builder
}
}
#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
@ -262,7 +215,7 @@ impl ExecutorStage {
&mut self,
systems: &[Arc<Mutex<Box<dyn System>>>],
run_ready_type: RunReadyType,
scope: &ScopeFifo<'run>,
scope: &mut bevy_tasks::Scope<'run, ()>,
world: &'run World,
resources: &'run Resources,
) -> RunReadyResult {
@ -308,7 +261,8 @@ impl ExecutorStage {
// handle multi-threaded system
let sender = self.sender.clone();
self.running_systems.insert(system_index);
scope.spawn_fifo(move |_| {
scope.spawn(async move {
let mut system = system.lock();
system.run(world, resources);
sender.send(system_index).unwrap();
@ -328,6 +282,10 @@ impl ExecutorStage {
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let compute_pool = resources
.get_cloned::<bevy_tasks::ComputeTaskPool>()
.unwrap();
// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
@ -364,7 +322,8 @@ impl ExecutorStage {
// if there are no upcoming thread local systems, run everything right now
0..systems.len()
};
rayon::scope_fifo(|scope| {
compute_pool.scope(|scope| {
run_ready_result = self.run_ready_systems(
systems,
RunReadyType::Range(run_ready_system_index_range),
@ -373,6 +332,7 @@ impl ExecutorStage {
resources,
);
});
loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
@ -393,7 +353,7 @@ impl ExecutorStage {
run_ready_result = RunReadyResult::Ok;
} else {
// wait for a system to finish, then run its dependents
rayon::scope_fifo(|scope| {
compute_pool.scope(|scope| {
loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
@ -410,7 +370,7 @@ impl ExecutorStage {
resources,
);
// if the next ready system is thread local, break out of this loop/rayon scope so it can be run
// if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
break;
}
@ -442,6 +402,7 @@ mod tests {
Commands,
};
use bevy_hecs::{Entity, World};
use bevy_tasks::{ComputeTaskPool, TaskPool};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use std::sync::Arc;
@ -455,6 +416,8 @@ mod tests {
fn cross_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));
let mut schedule = Schedule::default();
schedule.add_stage("PreArchetypeChange");
schedule.add_stage("PostArchetypeChange");
@ -484,6 +447,8 @@ mod tests {
fn intra_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));
let mut schedule = Schedule::default();
schedule.add_stage("update");
@ -512,6 +477,7 @@ mod tests {
fn schedule() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));
resources.insert(Counter::default());
resources.insert(1.0f64);
resources.insert(2isize);

View File

@ -1,6 +1,5 @@
use crate::{
resource::Resources,
schedule::ParallelExecutorOptions,
system::{System, SystemId, ThreadLocalExecution},
};
use bevy_hecs::World;
@ -168,15 +167,6 @@ impl Schedule {
return;
}
let thread_pool_builder = resources
.get::<ParallelExecutorOptions>()
.map(|options| (*options).clone())
.unwrap_or_else(ParallelExecutorOptions::default)
.create_builder();
// For now, bevy_ecs only uses the global thread pool so it is sufficient to configure it once here.
// Dont call .unwrap() as the function is called twice..
let _ = thread_pool_builder.build_global();
for stage in self.stages.values_mut() {
for system in stage.iter_mut() {
let mut system = system.lock();

View File

@ -0,0 +1,19 @@
/// A value bounded by a minimum and a maximum
///
/// If input is less than min then this returns min.
/// If input is greater than max then this returns max.
/// Otherwise this returns input.
///
/// **Panics** in debug mode if `!(min <= max)`.
///
/// Original implementation from num-traits licensed as MIT
pub fn clamp<T: PartialOrd>(input: T, min: T, max: T) -> T {
debug_assert!(min <= max, "min must be less than or equal to max");
if input < min {
min
} else if input > max {
max
} else {
input
}
}

View File

@ -1,6 +1,8 @@
mod clamp;
mod face_toward;
mod geometry;
pub use clamp::*;
pub use face_toward::*;
pub use geometry::*;
pub use glam::*;

View File

@ -0,0 +1,17 @@
[package]
name = "bevy_tasks"
version = "0.1.3"
authors = [
"Bevy Contributors <bevyengine@gmail.com>",
"Lachlan Sneff <lachlan.sneff@gmail.com>",
"Philip Degarmo <aclysma@gmail.com>"
]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
multitask = "0.2"
num_cpus = "1"
parking = "1"
pollster = "0.2"

View File

@ -0,0 +1,32 @@
# bevy_tasks
A refreshingly simple task executor for bevy. :)
This is a simple threadpool with minimal dependencies. The main usecase is a scoped fork-join, i.e. spawning tasks from
a single thread and having that thread await the completion of those tasks. This is intended specifically for
[`bevy`][bevy] as a lighter alternative to [`rayon`][rayon] for this specific usecase. There are also utilities for
generating the tasks from a slice of data. This library is intended for games and makes no attempt to ensure fairness
or ordering of spawned tasks.
It is based on [`multitask`][multitask], a lightweight executor that allows the end user to manage their own threads.
`multitask` is based on async-task, a core piece of async-std.
[bevy]: https://bevyengine.org
[rayon]: https://github.com/rayon-rs/rayon
[multitask]: https://github.com/stjepang/multitask
## Dependencies
A very small dependency list is a key feature of this module
```
├── multitask
│ ├── async-task
│ ├── concurrent-queue
│ │ └── cache-padded
│ └── fastrand
├── num_cpus
│ └── libc
├── parking
└── pollster
```

View File

@ -0,0 +1,33 @@
use bevy_tasks::TaskPoolBuilder;
// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin
// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical
// cores)
fn main() {
let pool = TaskPoolBuilder::new()
.thread_name("Busy Behavior ThreadPool".to_string())
.num_threads(4)
.build();
let t0 = std::time::Instant::now();
pool.scope(|s| {
for i in 0..40 {
s.spawn(async move {
let now = std::time::Instant::now();
while std::time::Instant::now() - now < std::time::Duration::from_millis(100) {
// spin, simulating work being done
}
println!(
"Thread {:?} index {} finished",
std::thread::current().id(),
i
);
})
}
});
let t1 = std::time::Instant::now();
println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32());
}

View File

@ -0,0 +1,31 @@
use bevy_tasks::TaskPoolBuilder;
// This sample demonstrates a thread pool with one thread per logical core and only one task
// spinning. Other than the one thread, the system should remain idle, demonstrating good behavior
// for small workloads.
fn main() {
let pool = TaskPoolBuilder::new()
.thread_name("Idle Behavior ThreadPool".to_string())
.build();
pool.scope(|s| {
for i in 0..1 {
s.spawn(async move {
println!("Blocking for 10 seconds");
let now = std::time::Instant::now();
while std::time::Instant::now() - now < std::time::Duration::from_millis(10000) {
// spin, simulating work being done
}
println!(
"Thread {:?} index {} finished",
std::thread::current().id(),
i
);
})
}
});
println!("all tasks finished");
}

View File

@ -0,0 +1,26 @@
mod slice;
pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;
mod task_pool;
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};
mod usages;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool};
pub mod prelude {
pub use crate::{
slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool},
};
}
pub fn logical_core_count() -> usize {
num_cpus::get()
}
pub fn physical_core_count() -> usize {
num_cpus::get_physical()
}

View File

@ -0,0 +1,116 @@
use super::TaskPool;
pub trait ParallelSlice<T: Sync>: AsRef<[T]> {
fn par_chunk_map<F, R>(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec<R>
where
F: Fn(&[T]) -> R + Send + Sync,
R: Send + 'static,
{
let slice = self.as_ref();
let f = &f;
task_pool.scope(|scope| {
for chunk in slice.chunks(chunk_size) {
scope.spawn(async move { f(chunk) });
}
})
}
fn par_splat_map<F, R>(&self, task_pool: &TaskPool, max_tasks: Option<usize>, f: F) -> Vec<R>
where
F: Fn(&[T]) -> R + Send + Sync,
R: Send + 'static,
{
let slice = self.as_ref();
let chunk_size = std::cmp::max(
1,
std::cmp::max(
slice.len() / task_pool.thread_num(),
slice.len() / max_tasks.unwrap_or(usize::MAX),
),
);
slice.par_chunk_map(task_pool, chunk_size, f)
}
}
impl<S, T: Sync> ParallelSlice<T> for S where S: AsRef<[T]> {}
pub trait ParallelSliceMut<T: Send>: AsMut<[T]> {
fn par_chunk_map_mut<F, R>(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec<R>
where
F: Fn(&mut [T]) -> R + Send + Sync,
R: Send + 'static,
{
let slice = self.as_mut();
let f = &f;
task_pool.scope(|scope| {
for chunk in slice.chunks_mut(chunk_size) {
scope.spawn(async move { f(chunk) });
}
})
}
fn par_splat_map_mut<F, R>(
&mut self,
task_pool: &TaskPool,
max_tasks: Option<usize>,
f: F,
) -> Vec<R>
where
F: Fn(&mut [T]) -> R + Send + Sync,
R: Send + 'static,
{
let mut slice = self.as_mut();
let chunk_size = std::cmp::max(
1,
std::cmp::max(
slice.len() / task_pool.thread_num(),
slice.len() / max_tasks.unwrap_or(usize::MAX),
),
);
slice.par_chunk_map_mut(task_pool, chunk_size, f)
}
}
impl<S, T: Send> ParallelSliceMut<T> for S where S: AsMut<[T]> {}
#[cfg(test)]
mod tests {
use crate::*;
#[test]
fn test_par_chunks_map() {
let v = vec![42; 1000];
let task_pool = TaskPool::new();
let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() });
let mut sum = 0;
for output in outputs {
sum += output;
}
assert_eq!(sum, 1000 * 42);
}
#[test]
fn test_par_chunks_map_mut() {
let mut v = vec![42; 1000];
let task_pool = TaskPool::new();
let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 {
for number in numbers.iter_mut() {
*number *= 2;
}
numbers.iter().sum()
});
let mut sum = 0;
for output in outputs {
sum += output;
}
assert_eq!(sum, 1000 * 42 * 2);
assert_eq!(v[0], 84);
}
}

View File

@ -0,0 +1,45 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Wraps `multitask::Task`, a spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
/// Wraps multitask::Task
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
/// Detaches the task to let it keep running in the background. See `multitask::Task::detach`
pub fn detach(self) {
self.0.detach();
}
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// See `multitask::Task::cancel`
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safe because Task is pinned and contains multitask::Task by value
unsafe { self.map_unchecked_mut(|x| &mut x.0).poll(cx) }
}
}

View File

@ -0,0 +1,285 @@
use parking::Unparker;
use std::{
future::Future,
mem,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
};
/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {
/// If set, we'll set up the thread pool to use at most n threads. Otherwise use
/// the logical core count of the system
num_threads: Option<usize>,
/// If set, we'll use the given stack size rather than the system default
stack_size: Option<usize>,
/// Allows customizing the name of the threads - helpful for debugging. If set, threads will
/// be named <thread_name> (<thread_index>), i.e. "MyThreadPool (2)"
thread_name: Option<String>,
}
impl TaskPoolBuilder {
/// Creates a new TaskPoolBuilder instance
pub fn new() -> Self {
Self::default()
}
/// Override the number of threads created for the pool. If unset, we default to the number
/// of logical cores of the system
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = Some(num_threads);
self
}
/// Override the stack size of the threads created for the pool
pub fn stack_size(mut self, stack_size: usize) -> Self {
self.stack_size = Some(stack_size);
self
}
/// Override the name of the threads created for the pool. If set, threads will
/// be named <thread_name> (<thread_index>), i.e. "MyThreadPool (2)"
pub fn thread_name(mut self, thread_name: String) -> Self {
self.thread_name = Some(thread_name);
self
}
/// Creates a new ThreadPoolBuilder based on the current options.
pub fn build(self) -> TaskPool {
TaskPool::new_internal(
self.num_threads,
self.stack_size,
self.thread_name.as_deref(),
)
}
}
struct TaskPoolInner {
threads: Vec<(JoinHandle<()>, Arc<Unparker>)>,
shutdown_flag: Arc<AtomicBool>,
}
impl Drop for TaskPoolInner {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::Release);
for (_, unparker) in &self.threads {
unparker.unpark();
}
for (join_handle, _) in self.threads.drain(..) {
join_handle
.join()
.expect("task thread panicked while executing");
}
}
}
/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
/// the pool on threads owned by the pool.
#[derive(Clone)]
pub struct TaskPool {
/// The executor for the pool
///
/// This has to be separate from TaskPoolInner because we have to create an Arc<Executor> to
/// pass into the worker threads, and we must create the worker threads before we can create the
/// Vec<Task<T>> contained within TaskPoolInner
executor: Arc<multitask::Executor>,
/// Inner state of the pool
inner: Arc<TaskPoolInner>,
}
impl TaskPool {
/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()
}
fn new_internal(
num_threads: Option<usize>,
stack_size: Option<usize>,
thread_name: Option<&str>,
) -> Self {
let executor = Arc::new(multitask::Executor::new());
let shutdown_flag = Arc::new(AtomicBool::new(false));
let num_threads = num_threads.unwrap_or_else(num_cpus::get);
let threads = (0..num_threads)
.map(|i| {
let ex = Arc::clone(&executor);
let flag = Arc::clone(&shutdown_flag);
let (p, u) = parking::pair();
let unparker = Arc::new(u);
let u = Arc::clone(&unparker);
// Run an executor thread.
let thread_name = if let Some(thread_name) = thread_name {
format!("{} ({})", thread_name, i)
} else {
format!("TaskPool ({})", i)
};
let mut thread_builder = thread::Builder::new().name(thread_name);
if let Some(stack_size) = stack_size {
thread_builder = thread_builder.stack_size(stack_size);
}
let handle = thread_builder
.spawn(move || {
let ticker = ex.ticker(move || u.unpark());
loop {
if flag.load(Ordering::Acquire) {
break;
}
if !ticker.tick() {
p.park();
}
}
})
.expect("failed to spawn thread");
(handle, unparker)
})
.collect();
Self {
executor,
inner: Arc::new(TaskPoolInner {
threads,
shutdown_flag,
}),
}
}
/// Return the number of threads owned by the task pool
pub fn thread_num(&self) -> usize {
self.inner.threads.len()
}
/// Allows spawning non-`static futures on the thread pool. The function takes a callback,
/// passing a scope object into it. The scope object provided to the callback can be used
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
/// This is similar to `rayon::scope` and `crossbeam::scope`
pub fn scope<'scope, F, T>(&self, f: F) -> Vec<T>
where
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
T: Send + 'static,
{
// SAFETY: This function blocks until all futures complete, so this future must return
// before this function returns. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let executor: &multitask::Executor = &*self.executor as &multitask::Executor;
let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) };
let fut = async move {
let mut scope = Scope {
executor,
spawned: Vec::new(),
};
f(&mut scope);
let mut results = Vec::with_capacity(scope.spawned.len());
for task in scope.spawned {
results.push(task.await);
}
results
};
// Move the value to ensure that it is owned
let mut fut = fut;
// Shadow the original binding so that it can't be directly accessed
// ever again.
let fut = unsafe { Pin::new_unchecked(&mut fut) };
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<&mut (dyn Future<Output = Vec<T>> + Send)> = fut;
let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + Send + 'static)> =
unsafe { mem::transmute(fut) };
pollster::block_on(self.executor.spawn(fut))
}
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
pub fn spawn<T>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> impl Future<Output = T> + Send
where
T: Send + 'static,
{
self.executor.spawn(future)
}
}
impl Default for TaskPool {
fn default() -> Self {
Self::new()
}
}
pub struct Scope<'scope, T> {
executor: &'scope multitask::Executor,
spawned: Vec<multitask::Task<T>>,
}
impl<'scope, T: Send + 'static> Scope<'scope, T> {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<Box<dyn Future<Output = T> + 'scope + Send>> = Box::pin(f);
let fut: Pin<Box<dyn Future<Output = T> + 'static + Send>> = unsafe { mem::transmute(fut) };
let task = self.executor.spawn(fut);
self.spawned.push(task);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_spawn() {
let pool = TaskPool::new();
let foo = Box::new(42);
let foo = &*foo;
let outputs = pool.scope(|scope| {
for i in 0..100 {
scope.spawn(async move {
println!("task {}", i);
if *foo != 42 {
panic!("not 42!?!?")
} else {
*foo
}
});
}
});
for output in outputs {
assert_eq!(output, 42);
}
}
}

View File

@ -0,0 +1,52 @@
//! Definitions for a few common task pools that we want. Generally the determining factor for what
//! kind of work should go in each pool is latency requirements.
//!
//! For CPU-intensive work (tasks that generally spin until completion) we have a standard Compute
//! pool and an AsyncCompute pool. Work that does not need to be completed to present the next
//! frame should go to the AsyncCompute pool
//!
//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO
//! task pool. The tasks here are expected to complete very quickly. Generally they should just
//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready
//! for consumption. (likely via channels)
use super::TaskPool;
use std::ops::Deref;
/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next
/// frame
#[derive(Clone)]
pub struct ComputeTaskPool(pub TaskPool);
impl Deref for ComputeTaskPool {
type Target = TaskPool;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// A newtype for a task pool for CPU-intensive work that may span across multiple frames
#[derive(Clone)]
pub struct AsyncComputeTaskPool(pub TaskPool);
impl Deref for AsyncComputeTaskPool {
type Target = TaskPool;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a
/// "woken" state)
#[derive(Clone)]
pub struct IOTaskPool(pub TaskPool);
impl Deref for IOTaskPool {
type Target = TaskPool;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -1,10 +1,11 @@
use bevy::{ecs::ParallelExecutorOptions, prelude::*};
use bevy::prelude::*;
use bevy_app::DefaultTaskPoolOptions;
/// This example illustrates how to customize the thread pool used internally (e.g. to only use a
/// certain number of threads).
fn main() {
App::build()
.add_resource(ParallelExecutorOptions::new().with_num_threads(Some(4)))
.add_resource(DefaultTaskPoolOptions::with_num_threads(4))
.add_default_plugins()
.run();
}

View File

@ -53,6 +53,7 @@ pub use bevy_property as property;
pub use bevy_render as render;
pub use bevy_scene as scene;
pub use bevy_sprite as sprite;
pub use bevy_tasks as tasks;
pub use bevy_text as text;
pub use bevy_transform as transform;
pub use bevy_type_registry as type_registry;