Update multitask dependency to async-executor (#452)

* Switch from the deprecated `multitask` crate to `async-executor`
* async-executor appears to be essentially multitask 0.3
* use block_on in futures_lite instead of pollster because futures_lite is already in the dependency list of async-executor
This commit is contained in:
Philip Degarmo 2020-09-09 13:12:50 -07:00 committed by GitHub
parent ca4a2114a8
commit 612c2552a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 50 deletions

View File

@ -11,8 +11,8 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
multitask = "0.2" futures-lite = "1.0"
num_cpus = "1"
parking = "1"
pollster = "0.2"
event-listener = "2.4.0" event-listener = "2.4.0"
async-executor = "0.2"
async-channel = "1.4.2"
num_cpus = "1"

View File

@ -87,7 +87,7 @@ pub fn countdown_event_ready_after() {
let countdown_event = CountdownEvent::new(2); let countdown_event = CountdownEvent::new(2);
countdown_event.decrement(); countdown_event.decrement();
countdown_event.decrement(); countdown_event.decrement();
pollster::block_on(countdown_event.listen()); futures_lite::future::block_on(countdown_event.listen());
} }
#[test] #[test]
@ -95,7 +95,8 @@ pub fn countdown_event_ready() {
let countdown_event = CountdownEvent::new(2); let countdown_event = CountdownEvent::new(2);
countdown_event.decrement(); countdown_event.decrement();
let countdown_event_clone = countdown_event.clone(); let countdown_event_clone = countdown_event.clone();
let handle = std::thread::spawn(move || pollster::block_on(countdown_event_clone.listen())); let handle =
std::thread::spawn(move || futures_lite::future::block_on(countdown_event_clone.listen()));
// Pause to give the new thread time to start blocking (ugly hack) // Pause to give the new thread time to start blocking (ugly hack)
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
@ -111,7 +112,7 @@ pub fn event_resets_if_listeners_are_cleared() {
// notify all listeners // notify all listeners
let listener1 = event.listen(); let listener1 = event.listen();
event.notify(std::usize::MAX); event.notify(std::usize::MAX);
pollster::block_on(listener1); futures_lite::future::block_on(listener1);
// If all listeners are notified, the structure should now be cleared. We're free to listen again // If all listeners are notified, the structure should now be cleared. We're free to listen again
let listener2 = event.listen(); let listener2 = event.listen();
@ -125,5 +126,5 @@ pub fn event_resets_if_listeners_are_cleared() {
// Notify all and verify the remaining listener is notified // Notify all and verify the remaining listener is notified
event.notify(std::usize::MAX); event.notify(std::usize::MAX);
pollster::block_on(listener3); futures_lite::future::block_on(listener3);
} }

View File

@ -13,7 +13,7 @@ use std::{
/// ///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. /// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
/// Wraps multitask::Task /// Wraps multitask::Task
pub struct Task<T>(multitask::Task<T>); pub struct Task<T>(async_executor::Task<T>);
impl<T> Task<T> { impl<T> Task<T> {
/// Detaches the task to let it keep running in the background. See `multitask::Task::detach` /// Detaches the task to let it keep running in the background. See `multitask::Task::detach`

View File

@ -1,12 +1,8 @@
use parking::Unparker;
use std::{ use std::{
future::Future, future::Future,
mem, mem,
pin::Pin, pin::Pin,
sync::{ sync::Arc,
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
@ -60,18 +56,15 @@ impl TaskPoolBuilder {
} }
struct TaskPoolInner { struct TaskPoolInner {
threads: Vec<(JoinHandle<()>, Arc<Unparker>)>, threads: Vec<JoinHandle<()>>,
shutdown_flag: Arc<AtomicBool>, shutdown_tx: async_channel::Sender<()>,
} }
impl Drop for TaskPoolInner { impl Drop for TaskPoolInner {
fn drop(&mut self) { fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::Release); self.shutdown_tx.close();
for (_, unparker) in &self.threads { for join_handle in self.threads.drain(..) {
unparker.unpark();
}
for (join_handle, _) in self.threads.drain(..) {
join_handle join_handle
.join() .join()
.expect("task thread panicked while executing"); .expect("task thread panicked while executing");
@ -88,7 +81,7 @@ pub struct TaskPool {
/// This has to be separate from TaskPoolInner because we have to create an Arc<Executor> to /// This has to be separate from TaskPoolInner because we have to create an Arc<Executor> to
/// pass into the worker threads, and we must create the worker threads before we can create the /// pass into the worker threads, and we must create the worker threads before we can create the
/// Vec<Task<T>> contained within TaskPoolInner /// Vec<Task<T>> contained within TaskPoolInner
executor: Arc<multitask::Executor>, executor: Arc<async_executor::Executor>,
/// Inner state of the pool /// Inner state of the pool
inner: Arc<TaskPoolInner>, inner: Arc<TaskPoolInner>,
@ -105,19 +98,16 @@ impl TaskPool {
stack_size: Option<usize>, stack_size: Option<usize>,
thread_name: Option<&str>, thread_name: Option<&str>,
) -> Self { ) -> Self {
let executor = Arc::new(multitask::Executor::new()); let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>();
let shutdown_flag = Arc::new(AtomicBool::new(false));
let executor = Arc::new(async_executor::Executor::new());
let num_threads = num_threads.unwrap_or_else(num_cpus::get); let num_threads = num_threads.unwrap_or_else(num_cpus::get);
let threads = (0..num_threads) let threads = (0..num_threads)
.map(|i| { .map(|i| {
let ex = Arc::clone(&executor); let ex = Arc::clone(&executor);
let flag = Arc::clone(&shutdown_flag); let shutdown_rx = shutdown_rx.clone();
let (p, u) = parking::pair();
let unparker = Arc::new(u);
let u = Arc::clone(&unparker);
// Run an executor thread.
let thread_name = if let Some(thread_name) = thread_name { let thread_name = if let Some(thread_name) = thread_name {
format!("{} ({})", thread_name, i) format!("{} ({})", thread_name, i)
@ -131,22 +121,13 @@ impl TaskPool {
thread_builder = thread_builder.stack_size(stack_size); thread_builder = thread_builder.stack_size(stack_size);
} }
let handle = thread_builder thread_builder
.spawn(move || { .spawn(move || {
let ticker = ex.ticker(move || u.unpark()); let shutdown_future = ex.run(shutdown_rx.recv());
loop { // Use unwrap_err because we expect a Closed error
if flag.load(Ordering::Acquire) { futures_lite::future::block_on(shutdown_future).unwrap_err();
break;
}
if !ticker.tick() {
p.park();
}
}
}) })
.expect("failed to spawn thread"); .expect("failed to spawn thread")
(handle, unparker)
}) })
.collect(); .collect();
@ -154,7 +135,7 @@ impl TaskPool {
executor, executor,
inner: Arc::new(TaskPoolInner { inner: Arc::new(TaskPoolInner {
threads, threads,
shutdown_flag, shutdown_tx,
}), }),
} }
} }
@ -178,8 +159,8 @@ impl TaskPool {
// before this function returns. However, rust has no way of knowing // before this function returns. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to // this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety. // validate safety.
let executor: &multitask::Executor = &*self.executor as &multitask::Executor; let executor: &async_executor::Executor = &*self.executor as &async_executor::Executor;
let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) }; let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) };
let fut = async move { let fut = async move {
let mut scope = Scope { let mut scope = Scope {
@ -212,7 +193,7 @@ impl TaskPool {
let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + Send + 'static)> = let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + Send + 'static)> =
unsafe { mem::transmute(fut) }; unsafe { mem::transmute(fut) };
pollster::block_on(self.executor.spawn(fut)) futures_lite::future::block_on(self.executor.spawn(fut))
} }
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
@ -236,8 +217,8 @@ impl Default for TaskPool {
} }
pub struct Scope<'scope, T> { pub struct Scope<'scope, T> {
executor: &'scope multitask::Executor, executor: &'scope async_executor::Executor,
spawned: Vec<multitask::Task<T>>, spawned: Vec<async_executor::Task<T>>,
} }
impl<'scope, T: Send + 'static> Scope<'scope, T> { impl<'scope, T: Send + 'static> Scope<'scope, T> {