diff -Nru rust-crossbeam-deque-0.6.3/Cargo.toml rust-crossbeam-deque-0.7.1/Cargo.toml --- rust-crossbeam-deque-0.6.3/Cargo.toml 1970-01-01 00:00:00.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/Cargo.toml 1970-01-01 00:00:00.000000000 +0000 @@ -12,10 +12,10 @@ [package] name = "crossbeam-deque" -version = "0.6.3" +version = "0.7.1" authors = ["The Crossbeam Project Developers"] description = "Concurrent work-stealing deque" -homepage = "https://github.com/crossbeam-rs/crossbeam" +homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" documentation = "https://docs.rs/crossbeam-deque" readme = "README.md" keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] @@ -26,6 +26,6 @@ version = "0.7" [dependencies.crossbeam-utils] -version = "0.6" +version = "0.6.5" [dev-dependencies.rand] version = "0.6" diff -Nru rust-crossbeam-deque-0.6.3/Cargo.toml.orig rust-crossbeam-deque-0.7.1/Cargo.toml.orig --- rust-crossbeam-deque-0.6.3/Cargo.toml.orig 2018-12-16 15:10:25.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/Cargo.toml.orig 2019-01-29 15:45:42.000000000 +0000 @@ -4,12 +4,12 @@ # - Update CHANGELOG.md # - Update README.md # - Create "crossbeam-deque-X.Y.Z" git tag -version = "0.6.3" +version = "0.7.1" authors = ["The Crossbeam Project Developers"] license = "MIT/Apache-2.0" readme = "README.md" repository = "https://github.com/crossbeam-rs/crossbeam" -homepage = "https://github.com/crossbeam-rs/crossbeam" +homepage = "https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque" documentation = "https://docs.rs/crossbeam-deque" description = "Concurrent work-stealing deque" keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"] @@ -20,7 +20,7 @@ path = "../crossbeam-epoch" [dependencies.crossbeam-utils] -version = "0.6" +version = "0.6.5" path = "../crossbeam-utils" [dev-dependencies] diff -Nru rust-crossbeam-deque-0.6.3/.cargo_vcs_info.json rust-crossbeam-deque-0.7.1/.cargo_vcs_info.json --- rust-crossbeam-deque-0.6.3/.cargo_vcs_info.json 1970-01-01 00:00:00.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/.cargo_vcs_info.json 1970-01-01 00:00:00.000000000 +0000 @@ -1,5 +1,5 @@ { "git": { - "sha1": "af6056c8177af273bf0e0adfe8b1f1107547e3c3" + "sha1": "3953368d98c598c864afb09be5558d780e2c9759" } } diff -Nru rust-crossbeam-deque-0.6.3/CHANGELOG.md rust-crossbeam-deque-0.7.1/CHANGELOG.md --- rust-crossbeam-deque-0.6.3/CHANGELOG.md 2018-12-16 15:10:25.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/CHANGELOG.md 2019-01-29 15:41:24.000000000 +0000 @@ -1,3 +1,17 @@ +# Version 0.7.1 + +- Bump the minimum required version of `crossbeam-utils`. + +# Version 0.7.0 + +- Make `Worker::pop()` faster in the FIFO case. +- Replace `fifo()` nad `lifo()` with `Worker::new_fifo()` and `Worker::new_lifo()`. +- Add more batched steal methods. +- Introduce `Injector`, a MPMC queue. +- Rename `Steal::Data` to `Steal::Success`. +- Add `Steal::or_else()` and implement `FromIterator` for `Steal`. +- Add `#[must_use]` to `Steal`. + # Version 0.6.3 - Bump `crossbeam-epoch` to `0.7`. diff -Nru rust-crossbeam-deque-0.6.3/debian/cargo-checksum.json rust-crossbeam-deque-0.7.1/debian/cargo-checksum.json --- rust-crossbeam-deque-0.6.3/debian/cargo-checksum.json 2019-01-22 05:43:19.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/debian/cargo-checksum.json 2019-07-16 02:26:35.000000000 +0000 @@ -1 +1 @@ -{"package":"05e44b8cf3e1a625844d1750e1f7820da46044ff6d28f4d43e455ba3e5bb2c13","files":{}} +{"package":"b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71","files":{}} diff -Nru rust-crossbeam-deque-0.6.3/debian/changelog rust-crossbeam-deque-0.7.1/debian/changelog --- rust-crossbeam-deque-0.6.3/debian/changelog 2019-01-22 05:43:19.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/debian/changelog 2019-07-16 02:26:35.000000000 +0000 @@ -1,3 +1,9 @@ +rust-crossbeam-deque (0.7.1-1) unstable; urgency=medium + + * Package crossbeam-deque 0.7.1 from crates.io using debcargo 2.2.10 + + -- kpcyrd Tue, 16 Jul 2019 02:26:35 +0000 + rust-crossbeam-deque (0.6.3-1) unstable; urgency=medium * Package crossbeam-deque 0.6.3 from crates.io using debcargo 2.2.9 diff -Nru rust-crossbeam-deque-0.6.3/debian/control rust-crossbeam-deque-0.7.1/debian/control --- rust-crossbeam-deque-0.6.3/debian/control 2019-01-22 05:43:19.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/debian/control 2019-07-16 02:26:35.000000000 +0000 @@ -2,12 +2,12 @@ Section: rust Priority: optional Build-Depends: debhelper (>= 11), - dh-cargo (>= 10), + dh-cargo (>= 15), cargo:native , rustc:native , libstd-rust-dev , librust-crossbeam-epoch-0.7+default-dev , - librust-crossbeam-utils-0.6+default-dev + librust-crossbeam-utils-0.6+default-dev (>= 0.6.5-~~) Maintainer: Debian Rust Maintainers Uploaders: Wolfgang Silbermayr , @@ -15,7 +15,7 @@ Standards-Version: 4.2.0 Vcs-Git: https://salsa.debian.org/rust-team/debcargo-conf.git [src/crossbeam-deque] Vcs-Browser: https://salsa.debian.org/rust-team/debcargo-conf/tree/master/src/crossbeam-deque -Homepage: https://github.com/crossbeam-rs/crossbeam +Homepage: https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-deque Package: librust-crossbeam-deque-dev Architecture: any @@ -23,15 +23,15 @@ Depends: ${misc:Depends}, librust-crossbeam-epoch-0.7+default-dev, - librust-crossbeam-utils-0.6+default-dev + librust-crossbeam-utils-0.6+default-dev (>= 0.6.5-~~) Provides: librust-crossbeam-deque+default-dev (= ${binary:Version}), librust-crossbeam-deque-0-dev (= ${binary:Version}), librust-crossbeam-deque-0+default-dev (= ${binary:Version}), - librust-crossbeam-deque-0.6-dev (= ${binary:Version}), - librust-crossbeam-deque-0.6+default-dev (= ${binary:Version}), - librust-crossbeam-deque-0.6.3-dev (= ${binary:Version}), - librust-crossbeam-deque-0.6.3+default-dev (= ${binary:Version}) + librust-crossbeam-deque-0.7-dev (= ${binary:Version}), + librust-crossbeam-deque-0.7+default-dev (= ${binary:Version}), + librust-crossbeam-deque-0.7.1-dev (= ${binary:Version}), + librust-crossbeam-deque-0.7.1+default-dev (= ${binary:Version}) Description: Concurrent work-stealing deque - Rust source code This package contains the source for the Rust crossbeam-deque crate, packaged by debcargo for use with cargo and dh-cargo. diff -Nru rust-crossbeam-deque-0.6.3/README.md rust-crossbeam-deque-0.7.1/README.md --- rust-crossbeam-deque-0.6.3/README.md 2018-12-11 10:44:51.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/README.md 2019-01-28 20:25:29.000000000 +0000 @@ -8,7 +8,7 @@ https://crates.io/crates/crossbeam-deque) [![Documentation](https://docs.rs/crossbeam-deque/badge.svg)]( https://docs.rs/crossbeam-deque) -[![Rust 1.26+](https://img.shields.io/badge/rust-1.26+-lightgray.svg)]( +[![Rust 1.28+](https://img.shields.io/badge/rust-1.28+-lightgray.svg)]( https://www.rust-lang.org) This crate provides work-stealing deques, which are primarily intended for @@ -20,7 +20,7 @@ ```toml [dependencies] -crossbeam-deque = "0.6" +crossbeam-deque = "0.7" ``` Next, add this to your crate: @@ -29,12 +29,6 @@ extern crate crossbeam_deque; ``` -## Compatibility - -The minimum supported Rust version is 1.26. - -This crate does not work in `no_std` environments. - ## License Licensed under either of @@ -44,7 +38,7 @@ at your option. -### Contribution +#### Contribution Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be diff -Nru rust-crossbeam-deque-0.6.3/src/lib.rs rust-crossbeam-deque-0.7.1/src/lib.rs --- rust-crossbeam-deque-0.6.3/src/lib.rs 2018-12-11 10:44:51.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/src/lib.rs 2019-01-28 11:30:47.000000000 +0000 @@ -1,64 +1,90 @@ -//! A concurrent work-stealing deque. +//! Concurrent work-stealing deques. //! -//! This data structure is most commonly used in schedulers. The typical setup involves a number of -//! threads where each thread has its own deque containing tasks. A thread may push tasks into its -//! deque as well as pop tasks from it. Once it runs out of tasks, it may steal some from other -//! threads to help complete tasks more quickly. Therefore, work-stealing deques supports three -//! essential operations: *push*, *pop*, and *steal*. +//! These data structures are most commonly used in work-stealing schedulers. The typical setup +//! involves a number of threads, each having its own FIFO or LIFO queue (*worker*). There is also +//! one global FIFO queue (*injector*) and a list of references to *worker* queues that are able to +//! steal tasks (*stealers*). //! -//! # Types of deques +//! We spawn a new task onto the scheduler by pushing it into the *injector* queue. Each worker +//! thread waits in a loop until it finds the next task to run and then runs it. To find a task, it +//! first looks into its local *worker* queue, and then into the *injector* and *stealers*. //! -//! There are two types of deques, differing only in which order tasks get pushed and popped. The -//! two task ordering strategies are: +//! # Queues //! -//! * First-in first-out (FIFO) -//! * Last-in first-out (LIFO) +//! [`Injector`] is a FIFO queue, where tasks are pushed and stolen from opposite ends. It is +//! shared among threads and is usually the entry point for new tasks. //! -//! A deque is a buffer with two ends, front and back. In a FIFO deque, tasks are pushed into the -//! back, popped from the front, and stolen from the front. However, in a LIFO deque, tasks are -//! popped from the back instead - that is the only difference. +//! [`Worker`] has two constructors: //! -//! # Workers and stealers +//! * [`new_fifo()`] - Creates a FIFO queue, in which tasks are pushed and popped from opposite +//! ends. +//! * [`new_lifo()`] - Creates a LIFO queue, in which tasks are pushed and popped from the same +//! end. //! -//! There are two functions that construct a deque: [`fifo`] and [`lifo`]. These functions return a -//! [`Worker`] and a [`Stealer`]. The thread which owns the deque is usually called *worker*, while -//! all other threads are *stealers*. +//! Each [`Worker`] is owned by a single thread and supports only push and pop operations. //! -//! [`Worker`] is able to push and pop tasks. It cannot be shared among multiple threads - only -//! one thread owns it. +//! Method [`stealer()`] creates a [`Stealer`] that may be shared among threads and can only steal +//! tasks from its [`Worker`]. Tasks are stolen from the end opposite to where they get pushed. //! -//! [`Stealer`] can only steal tasks. It can be shared among multiple threads by reference or by -//! cloning. Cloning a [`Stealer`] simply creates another one associated with the same deque. +//! # Stealing +//! +//! Steal operations come in three flavors: +//! +//! 1. [`steal()`] - Steals one task. +//! 2. [`steal_batch()`] - Steals a batch of tasks and moves them into another worker. +//! 3. [`steal_batch_and_pop()`] - Steals a batch of tasks, moves them into another queue, and pops +//! one task from that worker. +//! +//! In contrast to push and pop operations, stealing can spuriously fail with [`Steal::Retry`], in +//! which case the steal operation needs to be retried. //! //! # Examples //! -//! ``` -//! use crossbeam_deque::{self as deque, Pop, Steal}; -//! use std::thread; +//! Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To +//! find an available task, it might do the following: //! -//! // Create a LIFO deque. -//! let (w, s) = deque::lifo(); +//! 1. Try popping one task from the local worker queue. +//! 2. Try stealing a batch of tasks from the global injector queue. +//! 3. Try stealing one task from another thread using the stealer list. //! -//! // Push several elements into the back. -//! w.push(1); -//! w.push(2); -//! w.push(3); -//! -//! // This is a LIFO deque, which means an element is popped from the back. -//! // If it was a FIFO deque, `w.pop()` would return `Some(1)`. -//! assert_eq!(w.pop(), Pop::Data(3)); -//! -//! // Create a stealer thread. -//! thread::spawn(move || { -//! assert_eq!(s.steal(), Steal::Data(1)); -//! assert_eq!(s.steal(), Steal::Data(2)); -//! }).join().unwrap(); +//! An implementation of this work-stealing strategy: +//! +//! ``` +//! use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +//! use std::iter; +//! +//! fn find_task( +//! local: &Worker, +//! global: &Injector, +//! stealers: &[Stealer], +//! ) -> Option { +//! // Pop a task from the local queue, if not empty. +//! local.pop().or_else(|| { +//! // Otherwise, we need to look for a task elsewhere. +//! iter::repeat_with(|| { +//! // Try stealing a batch of tasks from the global queue. +//! global.steal_batch_and_pop(local) +//! // Or try stealing a task from one of the other threads. +//! .or_else(|| stealers.iter().map(|s| s.steal()).collect()) +//! }) +//! // Loop while no task was stolen and any steal operation needs to be retried. +//! .find(|s| !s.is_retry()) +//! // Extract the stolen task, if there is one. +//! .and_then(|s| s.success()) +//! }) +//! } //! ``` //! //! [`Worker`]: struct.Worker.html //! [`Stealer`]: struct.Stealer.html -//! [`fifo`]: fn.fifo.html -//! [`lifo`]: fn.lifo.html +//! [`Injector`]: struct.Stealer.html +//! [`Steal::Retry`]: enum.Steal.html#variant.Retry +//! [`new_fifo()`]: struct.Worker.html#method.new_fifo +//! [`new_lifo()`]: struct.Worker.html#method.new_lifo +//! [`stealer()`]: struct.Worker.html#method.stealer +//! [`steal()`]: struct.Stealer.html#method.steal +//! [`steal_batch()`]: struct.Stealer.html#method.steal_batch +//! [`steal_batch_and_pop()`]: struct.Stealer.html#method.steal_batch_and_pop #![warn(missing_docs)] #![warn(missing_debug_implementations)] @@ -66,111 +92,28 @@ extern crate crossbeam_epoch as epoch; extern crate crossbeam_utils as utils; -use std::cell::Cell; +use std::cell::{Cell, UnsafeCell}; use std::cmp; use std::fmt; +use std::iter::FromIterator; use std::marker::PhantomData; -use std::mem; +use std::mem::{self, ManuallyDrop}; use std::ptr; -use std::sync::atomic::{self, AtomicIsize, Ordering}; +use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; use epoch::{Atomic, Owned}; -use utils::CachePadded; - -/// Minimum buffer capacity for a deque. -const MIN_CAP: usize = 32; +use utils::{Backoff, CachePadded}; -/// Maximum number of additional elements that can be stolen in `steal_many`. -const MAX_BATCH: usize = 128; - -/// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets -/// deallocated as soon as possible. +// Minimum buffer capacity. +const MIN_CAP: usize = 64; +// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`. +const MAX_BATCH: usize = 32; +// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets +// deallocated as soon as possible. const FLUSH_THRESHOLD_BYTES: usize = 1 << 10; -/// Creates a work-stealing deque with the first-in first-out strategy. -/// -/// Elements are pushed into the back, popped from the front, and stolen from the front. In other -/// words, the worker side behaves as a FIFO queue. -/// -/// # Examples -/// -/// ``` -/// use crossbeam_deque::{self as deque, Pop, Steal}; -/// -/// let (w, s) = deque::fifo::(); -/// w.push(1); -/// w.push(2); -/// w.push(3); -/// -/// assert_eq!(s.steal(), Steal::Data(1)); -/// assert_eq!(w.pop(), Pop::Data(2)); -/// assert_eq!(w.pop(), Pop::Data(3)); -/// ``` -pub fn fifo() -> (Worker, Stealer) { - let buffer = Buffer::alloc(MIN_CAP); - - let inner = Arc::new(CachePadded::new(Inner { - front: AtomicIsize::new(0), - back: AtomicIsize::new(0), - buffer: Atomic::new(buffer), - })); - - let w = Worker { - inner: inner.clone(), - cached_buffer: Cell::new(buffer), - flavor: Flavor::Fifo, - _marker: PhantomData, - }; - let s = Stealer { - inner, - flavor: Flavor::Fifo, - }; - (w, s) -} - -/// Creates a work-stealing deque with the last-in first-out strategy. -/// -/// Elements are pushed into the back, popped from the back, and stolen from the front. In other -/// words, the worker side behaves as a LIFO stack. -/// -/// # Examples -/// -/// ``` -/// use crossbeam_deque::{self as deque, Pop, Steal}; -/// -/// let (w, s) = deque::lifo::(); -/// w.push(1); -/// w.push(2); -/// w.push(3); -/// -/// assert_eq!(s.steal(), Steal::Data(1)); -/// assert_eq!(w.pop(), Pop::Data(3)); -/// assert_eq!(w.pop(), Pop::Data(2)); -/// ``` -pub fn lifo() -> (Worker, Stealer) { - let buffer = Buffer::alloc(MIN_CAP); - - let inner = Arc::new(CachePadded::new(Inner { - front: AtomicIsize::new(0), - back: AtomicIsize::new(0), - buffer: Atomic::new(buffer), - })); - - let w = Worker { - inner: inner.clone(), - cached_buffer: Cell::new(buffer), - flavor: Flavor::Lifo, - _marker: PhantomData, - }; - let s = Stealer { - inner, - flavor: Flavor::Lifo, - }; - (w, s) -} - -/// A buffer that holds elements in a deque. +/// A buffer that holds tasks in a worker queue. /// /// This is just a pointer to the buffer and its length - dropping an instance of this struct will /// *not* deallocate the buffer. @@ -186,7 +129,7 @@ impl Buffer { /// Allocates a new buffer with the specified capacity. - fn alloc(cap: usize) -> Self { + fn alloc(cap: usize) -> Buffer { debug_assert_eq!(cap, cap.next_power_of_two()); let mut v = Vec::with_capacity(cap); @@ -201,28 +144,28 @@ drop(Vec::from_raw_parts(self.ptr, 0, self.cap)); } - /// Returns a pointer to the element at the specified `index`. + /// Returns a pointer to the task at the specified `index`. unsafe fn at(&self, index: isize) -> *mut T { // `self.cap` is always a power of two. self.ptr.offset(index & (self.cap - 1) as isize) } - /// Writes `value` into the specified `index`. + /// Writes `task` into the specified `index`. + /// + /// This method might be concurrently called with another `read` at the same index, which is + /// technically speaking a data race and therefore UB. We should use an atomic store here, but + /// that would be more expensive and difficult to implement generically for all types `T`. + /// Hence, as a hack, we use a volatile write instead. + unsafe fn write(&self, index: isize, task: T) { + ptr::write_volatile(self.at(index), task) + } + + /// Reads a task from the specified `index`. /// - /// Using this concurrently with another `read` or `write` is technically - /// speaking UB due to data races. We should be using relaxed accesses, but - /// that would cost too much performance. Hence, as a HACK, we use volatile - /// accesses instead. Experimental evidence shows that this works. - unsafe fn write(&self, index: isize, value: T) { - ptr::write_volatile(self.at(index), value) - } - - /// Reads a value from the specified `index`. - /// - /// Using this concurrently with a `write` is technically speaking UB due to - /// data races. We should be using relaxed accesses, but that would cost - /// too much performance. Hence, as a HACK, we use volatile accesses - /// instead. Experimental evidence shows that this works. + /// This method might be concurrently called with another `write` at the same index, which is + /// technically speaking a data race and therefore UB. We should use an atomic load here, but + /// that would be more expensive and difficult to implement generically for all types `T`. + /// Hence, as a hack, we use a volatile write instead. unsafe fn read(&self, index: isize) -> T { ptr::read_volatile(self.at(index)) } @@ -239,35 +182,7 @@ impl Copy for Buffer {} -/// Possible outcomes of a pop operation. -#[must_use] -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] -pub enum Pop { - /// The deque was empty at the time of popping. - Empty, - - /// Some data has been successfully popped. - Data(T), - - /// Lost the race for popping data to another concurrent steal operation. Try again. - Retry, -} - -/// Possible outcomes of a steal operation. -#[must_use] -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] -pub enum Steal { - /// The deque was empty at the time of stealing. - Empty, - - /// Some data has been successfully stolen. - Data(T), - - /// Lost the race for stealing data to another concurrent steal or pop operation. Try again. - Retry, -} - -/// Internal data that is shared between the worker and stealers. +/// Internal queue data shared between the worker and stealers. /// /// The implementation is based on the following work: /// @@ -288,7 +203,7 @@ back: AtomicIsize, /// The underlying buffer. - buffer: Atomic>, + buffer: CachePadded>>, } impl Drop for Inner { @@ -300,7 +215,7 @@ unsafe { let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected()); - // Go through the buffer from front to back and drop all elements in the deque. + // Go through the buffer from front to back and drop all tasks in the queue. let mut i = f; while i != b { ptr::drop_in_place(buffer.deref().at(i)); @@ -313,7 +228,7 @@ } } -/// The flavor of a deque: FIFO or LIFO. +/// Worker queue flavor: FIFO or LIFO. #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum Flavor { /// The first-in first-out flavor. @@ -323,22 +238,54 @@ Lifo, } -/// The worker side of a deque. +/// A worker queue. +/// +/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal +/// tasks from it. Task schedulers typically create a single worker queue per thread. +/// +/// # Examples +/// +/// A FIFO worker: +/// +/// ``` +/// use crossbeam_deque::{Steal, Worker}; +/// +/// let w = Worker::new_fifo(); +/// let s = w.stealer(); +/// +/// w.push(1); +/// w.push(2); +/// w.push(3); +/// +/// assert_eq!(s.steal(), Steal::Success(1)); +/// assert_eq!(w.pop(), Some(2)); +/// assert_eq!(w.pop(), Some(3)); +/// ``` +/// +/// A LIFO worker: +/// +/// ``` +/// use crossbeam_deque::{Steal, Worker}; /// -/// Workers push elements into the back and pop elements depending on the strategy: +/// let w = Worker::new_lifo(); +/// let s = w.stealer(); /// -/// * In FIFO deques, elements are popped from the front. -/// * In LIFO deques, elements are popped from the back. +/// w.push(1); +/// w.push(2); +/// w.push(3); /// -/// A deque has only one worker. Workers are not intended to be shared among multiple threads. +/// assert_eq!(s.steal(), Steal::Success(1)); +/// assert_eq!(w.pop(), Some(3)); +/// assert_eq!(w.pop(), Some(2)); +/// ``` pub struct Worker { - /// A reference to the inner representation of the deque. + /// A reference to the inner representation of the queue. inner: Arc>>, /// A copy of `inner.buffer` for quick access. - cached_buffer: Cell>, + buffer: Cell>, - /// The flavor of the deque. + /// The flavor of the queue. flavor: Flavor, /// Indicates that the worker cannot be shared among threads. @@ -348,19 +295,91 @@ unsafe impl Send for Worker {} impl Worker { + /// Creates a FIFO worker queue. + /// + /// Tasks are pushed and popped from opposite ends. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::::new_fifo(); + /// ``` + pub fn new_fifo() -> Worker { + let buffer = Buffer::alloc(MIN_CAP); + + let inner = Arc::new(CachePadded::new(Inner { + front: AtomicIsize::new(0), + back: AtomicIsize::new(0), + buffer: CachePadded::new(Atomic::new(buffer)), + })); + + Worker { + inner, + buffer: Cell::new(buffer), + flavor: Flavor::Fifo, + _marker: PhantomData, + } + } + + /// Creates a LIFO worker queue. + /// + /// Tasks are pushed and popped from the same end. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::::new_lifo(); + /// ``` + pub fn new_lifo() -> Worker { + let buffer = Buffer::alloc(MIN_CAP); + + let inner = Arc::new(CachePadded::new(Inner { + front: AtomicIsize::new(0), + back: AtomicIsize::new(0), + buffer: CachePadded::new(Atomic::new(buffer)), + })); + + Worker { + inner, + buffer: Cell::new(buffer), + flavor: Flavor::Lifo, + _marker: PhantomData, + } + } + + /// Creates a stealer for this queue. + /// + /// The returned stealer can be shared among threads and cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::::new_lifo(); + /// let s = w.stealer(); + /// ``` + pub fn stealer(&self) -> Stealer { + Stealer { + inner: self.inner.clone(), + flavor: self.flavor, + } + } + /// Resizes the internal buffer to the new capacity of `new_cap`. #[cold] unsafe fn resize(&self, new_cap: usize) { // Load the back index, front index, and buffer. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Relaxed); - let buffer = self.cached_buffer.get(); + let buffer = self.buffer.get(); - // Allocate a new buffer. + // Allocate a new buffer and copy data from the old buffer to the new one. let new = Buffer::alloc(new_cap); - self.cached_buffer.set(new); - - // Copy data from the old buffer to the new one. let mut i = f; while i != b { ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1); @@ -370,6 +389,7 @@ let guard = &epoch::pin(); // Replace the old buffer with the new one. + self.buffer.replace(new); let old = self.inner .buffer @@ -385,7 +405,7 @@ } } - /// Reserves enough capacity so that `reserve_cap` elements can be pushed without growing the + /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the /// buffer. fn reserve(&self, reserve_cap: usize) { if reserve_cap > 0 { @@ -395,9 +415,9 @@ let len = b.wrapping_sub(f) as usize; // The current capacity. - let cap = self.cached_buffer.get().cap; + let cap = self.buffer.get().cap; - // Is there enough capacity to push `reserve_cap` elements? + // Is there enough capacity to push `reserve_cap` tasks? if cap - len < reserve_cap { // Keep doubling the capacity as much as is needed. let mut new_cap = cap * 2; @@ -413,12 +433,13 @@ } } - /// Returns `true` if the deque is empty. + /// Returns `true` if the queue is empty. /// /// ``` - /// use crossbeam_deque as deque; + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::new_lifo(); /// - /// let (w, _) = deque::lifo(); /// assert!(w.is_empty()); /// w.push(1); /// assert!(!w.is_empty()); @@ -429,38 +450,38 @@ b.wrapping_sub(f) <= 0 } - /// Pushes an element into the back of the deque. + /// Pushes a task into the queue. /// /// # Examples /// /// ``` - /// use crossbeam_deque as deque; + /// use crossbeam_deque::Worker; /// - /// let (w, _) = deque::lifo(); + /// let w = Worker::new_lifo(); /// w.push(1); /// w.push(2); /// ``` - pub fn push(&self, value: T) { + pub fn push(&self, task: T) { // Load the back index, front index, and buffer. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Acquire); - let mut buffer = self.cached_buffer.get(); + let mut buffer = self.buffer.get(); - // Calculate the length of the deque. + // Calculate the length of the queue. let len = b.wrapping_sub(f); - // Is the deque full? + // Is the queue full? if len >= buffer.cap as isize { // Yes. Grow the underlying buffer. unsafe { self.resize(2 * buffer.cap); } - buffer = self.cached_buffer.get(); + buffer = self.buffer.get(); } - // Write `value` into the slot. + // Write `task` into the slot. unsafe { - buffer.write(b, value); + buffer.write(b, task); } atomic::fence(Ordering::Release); @@ -472,67 +493,61 @@ self.inner.back.store(b.wrapping_add(1), Ordering::Release); } - /// Pops an element from the deque. - /// - /// Which end of the deque is used depends on the strategy: - /// - /// * If this is a FIFO deque, an element is popped from the front. - /// * If this is a LIFO deque, an element is popped from the back. + /// Pops a task from the queue. /// /// # Examples /// /// ``` - /// use crossbeam_deque::{self as deque, Pop}; + /// use crossbeam_deque::Worker; /// - /// let (w, _) = deque::fifo(); + /// let w = Worker::new_fifo(); /// w.push(1); /// w.push(2); /// - /// assert_eq!(w.pop(), Pop::Data(1)); - /// assert_eq!(w.pop(), Pop::Data(2)); - /// assert_eq!(w.pop(), Pop::Empty); + /// assert_eq!(w.pop(), Some(1)); + /// assert_eq!(w.pop(), Some(2)); + /// assert_eq!(w.pop(), None); /// ``` - pub fn pop(&self) -> Pop { + pub fn pop(&self) -> Option { // Load the back and front index. let b = self.inner.back.load(Ordering::Relaxed); let f = self.inner.front.load(Ordering::Relaxed); - // Calculate the length of the deque. + // Calculate the length of the queue. let len = b.wrapping_sub(f); - // Is the deque empty? + // Is the queue empty? if len <= 0 { - return Pop::Empty; + return None; } match self.flavor { - // Pop from the front of the deque. + // Pop from the front of the queue. Flavor::Fifo => { - // Try incrementing the front index to pop the value. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - unsafe { - // Read the popped value. - let buffer = self.cached_buffer.get(); - let data = buffer.read(f); - - // Shrink the buffer if `len - 1` is less than one fourth of the capacity. - if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { - self.resize(buffer.cap / 2); - } + // Try incrementing the front index to pop the task. + let f = self.inner.front.fetch_add(1, Ordering::SeqCst); + let new_f = f.wrapping_add(1); + + if b.wrapping_sub(new_f) < 0 { + self.inner.front.store(f, Ordering::Relaxed); + return None; + } - return Pop::Data(data); + unsafe { + // Read the popped task. + let buffer = self.buffer.get(); + let task = buffer.read(f); + + // Shrink the buffer if `len - 1` is less than one fourth of the capacity. + if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { + self.resize(buffer.cap / 2); } - } - Pop::Retry + Some(task) + } } - // Pop from the back of the deque. + // Pop from the back of the queue. Flavor::Lifo => { // Decrement the back index. let b = b.wrapping_sub(1); @@ -547,15 +562,15 @@ let len = b.wrapping_sub(f); if len < 0 { - // The deque is empty. Restore the back index to the original value. + // The queue is empty. Restore the back index to the original task. self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); - Pop::Empty + None } else { - // Read the value to be popped. - let buffer = self.cached_buffer.get(); - let mut value = unsafe { Some(buffer.read(b)) }; + // Read the task to be popped. + let buffer = self.buffer.get(); + let mut task = unsafe { Some(buffer.read(b)) }; - // Are we popping the last element from the deque? + // Are we popping the last task from the queue? if len == 0 { // Try incrementing the front index. if self @@ -569,10 +584,10 @@ ).is_err() { // Failed. We didn't pop anything. - mem::forget(value.take()); + mem::forget(task.take()); } - // Restore the back index to the original value. + // Restore the back index to the original task. self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed); } else { // Shrink the buffer if `len` is less than one fourth of the capacity. @@ -583,10 +598,7 @@ } } - match value { - None => Pop::Empty, - Some(data) => Pop::Data(data), - } + task } } } @@ -599,16 +611,31 @@ } } -/// The stealer side of a deque. +/// A stealer handle of a worker queue. +/// +/// Stealers can be shared among threads. /// -/// Stealers can only steal elements from the front of the deque. +/// Task schedulers typically have a single worker queue per worker thread. /// -/// Stealers are cloneable so that they can be easily shared among multiple threads. +/// # Examples +/// +/// ``` +/// use crossbeam_deque::{Steal, Worker}; +/// +/// let w = Worker::new_lifo(); +/// w.push(1); +/// w.push(2); +/// +/// let s = w.stealer(); +/// assert_eq!(s.steal(), Steal::Success(1)); +/// assert_eq!(s.steal(), Steal::Success(2)); +/// assert_eq!(s.steal(), Steal::Empty); +/// ``` pub struct Stealer { - /// A reference to the inner representation of the deque. + /// A reference to the inner representation of the queue. inner: Arc>>, - /// The flavor of the deque. + /// The flavor of the queue. flavor: Flavor, } @@ -616,12 +643,14 @@ unsafe impl Sync for Stealer {} impl Stealer { - /// Returns `true` if the deque is empty. + /// Returns `true` if the queue is empty. /// /// ``` - /// use crossbeam_deque as deque; + /// use crossbeam_deque::Worker; + /// + /// let w = Worker::new_lifo(); + /// let s = w.stealer(); /// - /// let (w, s) = deque::lifo(); /// assert!(s.is_empty()); /// w.push(1); /// assert!(!s.is_empty()); @@ -633,20 +662,20 @@ b.wrapping_sub(f) <= 0 } - /// Steals an element from the front of the deque. + /// Steals a task from the queue. /// /// # Examples /// /// ``` - /// use crossbeam_deque::{self as deque, Steal}; + /// use crossbeam_deque::{Steal, Worker}; /// - /// let (w, s) = deque::lifo(); + /// let w = Worker::new_lifo(); /// w.push(1); /// w.push(2); /// - /// assert_eq!(s.steal(), Steal::Data(1)); - /// assert_eq!(s.steal(), Steal::Data(2)); - /// assert_eq!(s.steal(), Steal::Empty); + /// let s = w.stealer(); + /// assert_eq!(s.steal(), Steal::Success(1)); + /// assert_eq!(s.steal(), Steal::Success(2)); /// ``` pub fn steal(&self) -> Steal { // Load the front index. @@ -666,55 +695,55 @@ // Load the back index. let b = self.inner.back.load(Ordering::Acquire); - // Is the deque empty? + // Is the queue empty? if b.wrapping_sub(f) <= 0 { return Steal::Empty; } - // Load the buffer and read the value at the front. + // Load the buffer and read the task at the front. let buffer = self.inner.buffer.load(Ordering::Acquire, guard); - let value = unsafe { buffer.deref().read(f) }; + let task = unsafe { buffer.deref().read(f) }; - // Try incrementing the front index to steal the value. + // Try incrementing the front index to steal the task. if self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { - // We didn't steal this value, forget it. - mem::forget(value); + // We didn't steal this task, forget it. + mem::forget(task); return Steal::Retry; } - // Return the stolen value. - Steal::Data(value) + // Return the stolen task. + Steal::Success(task) } - /// Steals elements from the front of the deque. + /// Steals a batch of tasks and pushes them into another worker. /// - /// If at least one element can be stolen, it will be returned. Additionally, some of the - /// remaining elements will be stolen and pushed into the back of worker `dest` in order to - /// balance the work among deques. There is no hard guarantee on exactly how many elements will - /// be stolen, but it should be around half of the deque. + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. /// /// # Examples /// /// ``` - /// use crossbeam_deque::{self as deque, Steal}; - /// - /// let (w1, s1) = deque::fifo(); - /// let (w2, s2) = deque::fifo(); + /// use crossbeam_deque::Worker; /// + /// let w1 = Worker::new_fifo(); /// w1.push(1); /// w1.push(2); /// w1.push(3); /// w1.push(4); /// - /// assert_eq!(s1.steal_many(&w2), Steal::Data(1)); - /// assert_eq!(s2.steal(), Steal::Data(2)); + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// s.steal_batch(&w2); + /// assert_eq!(w2.pop(), Some(1)); + /// assert_eq!(w2.pop(), Some(2)); /// ``` - pub fn steal_many(&self, dest: &Worker) -> Steal { + pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { // Load the front index. let mut f = self.inner.front.load(Ordering::Acquire); @@ -732,33 +761,44 @@ // Load the back index. let b = self.inner.back.load(Ordering::Acquire); - // Is the deque empty? + // Is the queue empty? let len = b.wrapping_sub(f); if len <= 0 { return Steal::Empty; } - // Reserve capacity for the stolen additional elements. - let additional = cmp::min((len as usize - 1) / 2, MAX_BATCH); - dest.reserve(additional); - let additional = additional as isize; + // Reserve capacity for the stolen batch. + let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH); + dest.reserve(batch_size); + let mut batch_size = batch_size as isize; // Get the destination buffer and back index. - let dest_buffer = dest.cached_buffer.get(); + let dest_buffer = dest.buffer.get(); let mut dest_b = dest.inner.back.load(Ordering::Relaxed); - // Load the buffer and read the value at the front. + // Load the buffer. let buffer = self.inner.buffer.load(Ordering::Acquire, guard); - let value = unsafe { buffer.deref().read(f) }; match self.flavor { - // Steal a batch of elements from the front at once. + // Steal a batch of tasks from the front at once. Flavor::Fifo => { - // Copy the additional elements from the source to the destination buffer. - for i in 0..additional { - unsafe { - let value = buffer.deref().read(f.wrapping_add(i + 1)); - dest_buffer.write(dest_b.wrapping_add(i), value); + // Copy the batch from the source to the destination buffer. + match dest.flavor { + Flavor::Fifo => { + for i in 0..batch_size { + unsafe { + let task = buffer.deref().read(f.wrapping_add(i)); + dest_buffer.write(dest_b.wrapping_add(i), task); + } + } + } + Flavor::Lifo => { + for i in 0..batch_size { + unsafe { + let task = buffer.deref().read(f.wrapping_add(i)); + dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); + } + } } } @@ -768,49 +808,219 @@ .front .compare_exchange( f, - f.wrapping_add(additional + 1), + f.wrapping_add(batch_size), Ordering::SeqCst, Ordering::Relaxed, ).is_err() { - // We didn't steal this value, forget it. - mem::forget(value); return Steal::Retry; } - atomic::fence(Ordering::Release); + dest_b = dest_b.wrapping_add(batch_size); + } + + // Steal a batch of tasks from the front one by one. + Flavor::Lifo => { + for i in 0..batch_size { + // If this is not the first steal, check whether the queue is empty. + if i > 0 { + // We've already got the current front index. Now execute the fence to + // synchronize with other threads. + atomic::fence(Ordering::SeqCst); + + // Load the back index. + let b = self.inner.back.load(Ordering::Acquire); + + // Is the queue empty? + if b.wrapping_sub(f) <= 0 { + batch_size = i; + break; + } + } + + // Read the task at the front. + let task = unsafe { buffer.deref().read(f) }; + + // Try incrementing the front index to steal the task. + if self + .inner + .front + .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + // We didn't steal this task, forget it and break from the loop. + mem::forget(task); + batch_size = i; + break; + } + + // Write the stolen task into the destination buffer. + unsafe { + dest_buffer.write(dest_b, task); + } + + // Move the source front index and the destination back index one step forward. + f = f.wrapping_add(1); + dest_b = dest_b.wrapping_add(1); + } + + // If we didn't steal anything, the operation needs to be retried. + if batch_size == 0 { + return Steal::Retry; + } + + // If stealing into a FIFO queue, stolen tasks need to be reversed. + if dest.flavor == Flavor::Fifo { + for i in 0..batch_size / 2 { + unsafe { + let i1 = dest_b.wrapping_sub(batch_size - i); + let i2 = dest_b.wrapping_sub(i + 1); + let t1 = dest_buffer.read(i1); + let t2 = dest_buffer.read(i2); + dest_buffer.write(i1, t2); + dest_buffer.write(i2, t1); + } + } + } + } + } + + atomic::fence(Ordering::Release); + + // Update the back index in the destination queue. + // + // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data + // races because it doesn't understand fences. + dest.inner + .back + .store(dest_b, Ordering::Release); + + // Return with success. + Steal::Success(()) + } + + /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Steal, Worker}; + /// + /// let w1 = Worker::new_fifo(); + /// w1.push(1); + /// w1.push(2); + /// w1.push(3); + /// w1.push(4); + /// + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1)); + /// assert_eq!(w2.pop(), Some(2)); + /// ``` + pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { + // Load the front index. + let mut f = self.inner.front.load(Ordering::Acquire); + + // A SeqCst fence is needed here. + // + // If the current thread is already pinned (reentrantly), we must manually issue the + // fence. Otherwise, the following pinning will issue the fence anyway, so we don't + // have to. + if epoch::is_pinned() { + atomic::fence(Ordering::SeqCst); + } + + let guard = &epoch::pin(); + + // Load the back index. + let b = self.inner.back.load(Ordering::Acquire); + + // Is the queue empty? + let len = b.wrapping_sub(f); + if len <= 0 { + return Steal::Empty; + } + + // Reserve capacity for the stolen batch. + let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1); + dest.reserve(batch_size); + let mut batch_size = batch_size as isize; + + // Get the destination buffer and back index. + let dest_buffer = dest.buffer.get(); + let mut dest_b = dest.inner.back.load(Ordering::Relaxed); + + // Load the buffer + let buffer = self.inner.buffer.load(Ordering::Acquire, guard); + + // Read the task at the front. + let mut task = unsafe { buffer.deref().read(f) }; + + match self.flavor { + // Steal a batch of tasks from the front at once. + Flavor::Fifo => { + // Copy the batch from the source to the destination buffer. + match dest.flavor { + Flavor::Fifo => { + for i in 0..batch_size { + unsafe { + let task = buffer.deref().read(f.wrapping_add(i + 1)); + dest_buffer.write(dest_b.wrapping_add(i), task); + } + } + } + Flavor::Lifo => { + for i in 0..batch_size { + unsafe { + let task = buffer.deref().read(f.wrapping_add(i + 1)); + dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task); + } + } + } + } - // Success! Update the back index in the destination deque. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely report - // data races because it doesn't understand fences. - dest.inner - .back - .store(dest_b.wrapping_add(additional), Ordering::Release); + // Try incrementing the front index to steal the batch. + if self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size + 1), + Ordering::SeqCst, + Ordering::Relaxed, + ).is_err() + { + // We didn't steal this task, forget it. + mem::forget(task); + return Steal::Retry; + } - // Return the first stolen value. - Steal::Data(value) + dest_b = dest_b.wrapping_add(batch_size); } - // Steal a batch of elements from the front one by one. + // Steal a batch of tasks from the front one by one. Flavor::Lifo => { - // Try incrementing the front index to steal the value. + // Try incrementing the front index to steal the task. if self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { - // We didn't steal this value, forget it. - mem::forget(value); + // We didn't steal this task, forget it. + mem::forget(task); return Steal::Retry; } // Move the front index one step forward. f = f.wrapping_add(1); - // Repeat the same procedure for the additional steals. - for _ in 0..additional { + // Repeat the same procedure for the batch steals. + for i in 0..batch_size { // We've already got the current front index. Now execute the fence to // synchronize with other threads. atomic::fence(Ordering::SeqCst); @@ -818,48 +1028,66 @@ // Load the back index. let b = self.inner.back.load(Ordering::Acquire); - // Is the deque empty? + // Is the queue empty? if b.wrapping_sub(f) <= 0 { + batch_size = i; break; } - // Read the value at the front. - let value = unsafe { buffer.deref().read(f) }; + // Read the task at the front. + let tmp = unsafe { buffer.deref().read(f) }; - // Try incrementing the front index to steal the value. + // Try incrementing the front index to steal the task. if self .inner .front .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) .is_err() { - // We didn't steal this value, forget it and break from the loop. - mem::forget(value); + // We didn't steal this task, forget it and break from the loop. + mem::forget(tmp); + batch_size = i; break; } - // Write the stolen value into the destination buffer. + // Write the previously stolen task into the destination buffer. unsafe { - dest_buffer.write(dest_b, value); + dest_buffer.write(dest_b, mem::replace(&mut task, tmp)); } // Move the source front index and the destination back index one step forward. f = f.wrapping_add(1); dest_b = dest_b.wrapping_add(1); - - atomic::fence(Ordering::Release); - - // Update the destination back index. - // - // This ordering could be `Relaxed`, but then thread sanitizer would falsely - // report data races because it doesn't understand fences. - dest.inner.back.store(dest_b, Ordering::Release); } - // Return the first stolen value. - Steal::Data(value) - } + // If stealing into a FIFO queue, stolen tasks need to be reversed. + if dest.flavor == Flavor::Fifo { + for i in 0..batch_size / 2 { + unsafe { + let i1 = dest_b.wrapping_sub(batch_size - i); + let i2 = dest_b.wrapping_sub(i + 1); + let t1 = dest_buffer.read(i1); + let t2 = dest_buffer.read(i2); + dest_buffer.write(i1, t2); + dest_buffer.write(i2, t1); + } + } + } + } } + + atomic::fence(Ordering::Release); + + // Update the back index in the destination queue. + // + // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data + // races because it doesn't understand fences. + dest.inner + .back + .store(dest_b, Ordering::Release); + + // Return with success. + Steal::Success(task) } } @@ -877,3 +1105,904 @@ f.pad("Stealer { .. }") } } + +// Bits indicating the state of a slot: +// * If a task has been written into the slot, `WRITE` is set. +// * If a task has been read from the slot, `READ` is set. +// * If the block is being destroyed, `DESTROY` is set. +const WRITE: usize = 1; +const READ: usize = 2; +const DESTROY: usize = 4; + +// Each block covers one "lap" of indices. +const LAP: usize = 64; +// The maximum number of values a block can hold. +const BLOCK_CAP: usize = LAP - 1; +// How many lower bits are reserved for metadata. +const SHIFT: usize = 1; +// Indicates that the block is not the last one. +const HAS_NEXT: usize = 1; + +/// A slot in a block. +struct Slot { + /// The task. + task: UnsafeCell>, + + /// The state of the slot. + state: AtomicUsize, +} + +impl Slot { + /// Waits until a task is written into the slot. + fn wait_write(&self) { + let backoff = Backoff::new(); + while self.state.load(Ordering::Acquire) & WRITE == 0 { + backoff.snooze(); + } + } +} + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` values. +struct Block { + /// The next block in the linked list. + next: AtomicPtr>, + + /// Slots for values. + slots: [Slot; BLOCK_CAP], +} + +impl Block { + /// Creates an empty block that starts at `start_index`. + fn new() -> Block { + unsafe { mem::zeroed() } + } + + /// Waits until the next pointer is set. + fn wait_next(&self) -> *mut Block { + let backoff = Backoff::new(); + loop { + let next = self.next.load(Ordering::Acquire); + if !next.is_null() { + return next; + } + backoff.snooze(); + } + } + + /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. + unsafe fn destroy(this: *mut Block, count: usize) { + // It is not necessary to set the `DESTROY` bit in the last slot because that slot has + // begun destruction of the block. + for i in (0..count).rev() { + let slot = (*this).slots.get_unchecked(i); + + // Mark the `DESTROY` bit if a thread is still using the slot. + if slot.state.load(Ordering::Acquire) & READ == 0 + && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 + { + // If a thread is still using the slot, it will continue destruction of the block. + return; + } + } + + // No thread is using the block, now it is safe to destroy it. + drop(Box::from_raw(this)); + } +} + +/// A position in a queue. +struct Position { + /// The index in the queue. + index: AtomicUsize, + + /// The block in the linked list. + block: AtomicPtr>, +} + +/// An injector queue. +/// +/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have +/// a single injector queue, which is the entry point for new tasks. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_deque::{Injector, Steal}; +/// +/// let q = Injector::new(); +/// q.push(1); +/// q.push(2); +/// +/// assert_eq!(q.steal(), Steal::Success(1)); +/// assert_eq!(q.steal(), Steal::Success(2)); +/// assert_eq!(q.steal(), Steal::Empty); +/// ``` +pub struct Injector { + /// The head of the queue. + head: CachePadded>, + + /// The tail of the queue. + tail: CachePadded>, + + /// Indicates that dropping a `Injector` may drop values of type `T`. + _marker: PhantomData, +} + +unsafe impl Send for Injector {} +unsafe impl Sync for Injector {} + +impl Injector { + /// Creates a new injector queue. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Injector; + /// + /// let q = Injector::::new(); + /// ``` + pub fn new() -> Injector { + let block = Box::into_raw(Box::new(Block::::new())); + Injector { + head: CachePadded::new(Position { + block: AtomicPtr::new(block), + index: AtomicUsize::new(0), + }), + tail: CachePadded::new(Position { + block: AtomicPtr::new(block), + index: AtomicUsize::new(0), + }), + _marker: PhantomData, + } + } + + /// Pushes a task into the queue. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Injector; + /// + /// let w = Injector::new(); + /// w.push(1); + /// w.push(2); + /// ``` + pub fn push(&self, task: T) { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + let mut block = self.tail.block.load(Ordering::Acquire); + let mut next_block = None; + + loop { + // Calculate the offset of the index into the block. + let offset = (tail >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + continue; + } + + // If we're going to have to install the next block, allocate it in advance in order to + // make the wait for other threads as short as possible. + if offset + 1 == BLOCK_CAP && next_block.is_none() { + next_block = Some(Box::new(Block::::new())); + } + + let new_tail = tail + (1 << SHIFT); + + // Try advancing the tail forward. + match self.tail.index + .compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Acquire, + ) + { + Ok(_) => unsafe { + // If we've reached the end of the block, install the next one. + if offset + 1 == BLOCK_CAP { + let next_block = Box::into_raw(next_block.unwrap()); + let next_index = new_tail.wrapping_add(1 << SHIFT); + + self.tail.block.store(next_block, Ordering::Release); + self.tail.index.store(next_index, Ordering::Release); + (*block).next.store(next_block, Ordering::Release); + } + + // Write the task into the slot. + let slot = (*block).slots.get_unchecked(offset); + slot.task.get().write(ManuallyDrop::new(task)); + slot.state.fetch_or(WRITE, Ordering::Release); + + return; + } + Err(t) => { + tail = t; + block = self.tail.block.load(Ordering::Acquire); + backoff.spin(); + } + } + } + } + + /// Steals a task from the queue. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Steal}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// + /// assert_eq!(q.steal(), Steal::Success(1)); + /// assert_eq!(q.steal(), Steal::Success(2)); + /// assert_eq!(q.steal(), Steal::Empty); + /// ``` + pub fn steal(&self) -> Steal { + let mut head; + let mut block; + let mut offset; + + let backoff = Backoff::new(); + loop { + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + + // Calculate the offset of the index into the block. + offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + } else { + break; + } + } + + let mut new_head = head + (1 << SHIFT); + + if new_head & HAS_NEXT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the queue is empty. + if head >> SHIFT == tail >> SHIFT { + return Steal::Empty; + } + + // If head and tail are not in the same block, set `HAS_NEXT` in head. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= HAS_NEXT; + } + } + + // Try moving the head index forward. + if self.head.index + .compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) + .is_err() + { + return Steal::Retry; + } + + unsafe { + // If we've reached the end of the block, move to the next one. + if offset + 1 == BLOCK_CAP { + let next = (*block).wait_next(); + let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= HAS_NEXT; + } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + // Read the task. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy + // but couldn't because we were busy reading from the slot. + if offset + 1 == BLOCK_CAP { + Block::destroy(block, offset); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset); + } + + Steal::Success(task) + } + } + + /// Steals a batch of tasks and pushes them into a worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// + /// let w = Worker::new_fifo(); + /// q.steal_batch(&w); + /// assert_eq!(w.pop(), Some(1)); + /// assert_eq!(w.pop(), Some(2)); + /// ``` + pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { + let mut head; + let mut block; + let mut offset; + + let backoff = Backoff::new(); + loop { + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + + // Calculate the offset of the index into the block. + offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + } else { + break; + } + } + + let mut new_head = head; + let advance; + + if new_head & HAS_NEXT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the queue is empty. + if head >> SHIFT == tail >> SHIFT { + return Steal::Empty; + } + + // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate + // the right batch size to steal. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= HAS_NEXT; + // We can steal all tasks till the end of the block. + advance = (BLOCK_CAP - offset).min(MAX_BATCH); + } else { + let len = (tail - head) >> SHIFT; + // Steal half of the available tasks. + advance = ((len + 1) / 2).min(MAX_BATCH); + } + } else { + // We can steal all tasks till the end of the block. + advance = (BLOCK_CAP - offset).min(MAX_BATCH); + } + + new_head += advance << SHIFT; + let new_offset = offset + advance; + + // Try moving the head index forward. + if self.head.index + .compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) + .is_err() + { + return Steal::Retry; + } + + // Reserve capacity for the stolen batch. + let batch_size = new_offset - offset; + dest.reserve(batch_size); + + // Get the destination buffer and back index. + let dest_buffer = dest.buffer.get(); + let dest_b = dest.inner.back.load(Ordering::Relaxed); + + unsafe { + // If we've reached the end of the block, move to the next one. + if new_offset == BLOCK_CAP { + let next = (*block).wait_next(); + let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= HAS_NEXT; + } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + // Copy values from the injector into the destination queue. + match dest.flavor { + Flavor::Fifo => { + for i in 0..batch_size { + // Read the task. + let slot = (*block).slots.get_unchecked(offset + i); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + // Write it into the destination queue. + dest_buffer.write(dest_b.wrapping_add(i as isize), task); + } + } + + Flavor::Lifo => { + for i in 0..batch_size { + // Read the task. + let slot = (*block).slots.get_unchecked(offset + i); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + // Write it into the destination queue. + dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); + } + } + } + + atomic::fence(Ordering::Release); + + // Update the back index in the destination queue. + // + // This ordering could be `Relaxed`, but then thread sanitizer would falsely report + // data races because it doesn't understand fences. + dest.inner + .back + .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy + // but couldn't because we were busy reading from the slot. + if new_offset == BLOCK_CAP { + Block::destroy(block, offset); + } else { + for i in offset..new_offset { + let slot = (*block).slots.get_unchecked(i); + + if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset); + break; + } + } + } + + Steal::Success(()) + } + } + + /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Steal, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// + /// let w = Worker::new_fifo(); + /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1)); + /// assert_eq!(w.pop(), Some(2)); + /// ``` + pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { + let mut head; + let mut block; + let mut offset; + + let backoff = Backoff::new(); + loop { + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + + // Calculate the offset of the index into the block. + offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.snooze(); + } else { + break; + } + } + + let mut new_head = head; + let advance; + + if new_head & HAS_NEXT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the queue is empty. + if head >> SHIFT == tail >> SHIFT { + return Steal::Empty; + } + + // If head and tail are not in the same block, set `HAS_NEXT` in head. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= HAS_NEXT; + // We can steal all tasks till the end of the block. + advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + } else { + let len = (tail - head) >> SHIFT; + // Steal half of the available tasks. + advance = ((len + 1) / 2).min(MAX_BATCH + 1); + } + } else { + // We can steal all tasks till the end of the block. + advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + } + + new_head += advance << SHIFT; + let new_offset = offset + advance; + + // Try moving the head index forward. + if self.head.index + .compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) + .is_err() + { + return Steal::Retry; + } + + // Reserve capacity for the stolen batch. + let batch_size = new_offset - offset - 1; + dest.reserve(batch_size); + + // Get the destination buffer and back index. + let dest_buffer = dest.buffer.get(); + let dest_b = dest.inner.back.load(Ordering::Relaxed); + + unsafe { + // If we've reached the end of the block, move to the next one. + if new_offset == BLOCK_CAP { + let next = (*block).wait_next(); + let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= HAS_NEXT; + } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + // Read the task. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + match dest.flavor { + Flavor::Fifo => { + // Copy values from the injector into the destination queue. + for i in 0..batch_size { + // Read the task. + let slot = (*block).slots.get_unchecked(offset + i + 1); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + // Write it into the destination queue. + dest_buffer.write(dest_b.wrapping_add(i as isize), task); + } + } + + Flavor::Lifo => { + // Copy values from the injector into the destination queue. + for i in 0..batch_size { + // Read the task. + let slot = (*block).slots.get_unchecked(offset + i + 1); + slot.wait_write(); + let m = slot.task.get().read(); + let task = ManuallyDrop::into_inner(m); + + // Write it into the destination queue. + dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); + } + } + } + + atomic::fence(Ordering::Release); + + // Update the back index in the destination queue. + // + // This ordering could be `Relaxed`, but then thread sanitizer would falsely report + // data races because it doesn't understand fences. + dest.inner + .back + .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy + // but couldn't because we were busy reading from the slot. + if new_offset == BLOCK_CAP { + Block::destroy(block, offset); + } else { + for i in offset..new_offset { + let slot = (*block).slots.get_unchecked(i); + + if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset); + break; + } + } + } + + Steal::Success(task) + } + } + + /// Returns `true` if the queue is empty. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Injector; + /// + /// let q = Injector::new(); + /// + /// assert!(q.is_empty()); + /// q.push(1); + /// assert!(!q.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + let head = self.head.index.load(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::SeqCst); + head >> SHIFT == tail >> SHIFT + } +} + +impl Drop for Injector { + fn drop(&mut self) { + let mut head = self.head.index.load(Ordering::Relaxed); + let mut tail = self.tail.index.load(Ordering::Relaxed); + let mut block = self.head.block.load(Ordering::Relaxed); + + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); + + unsafe { + // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the task in the slot. + let slot = (*block).slots.get_unchecked(offset); + ManuallyDrop::drop(&mut *(*slot).task.get()); + } else { + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Relaxed); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + drop(Box::from_raw(block)); + } + } +} + +impl fmt::Debug for Injector { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Worker { .. }") + } +} + +/// Possible outcomes of a steal operation. +/// +/// # Examples +/// +/// There are lots of ways to chain results of steal operations together: +/// +/// ``` +/// use crossbeam_deque::Steal::{self, Empty, Retry, Success}; +/// +/// let collect = |v: Vec>| v.into_iter().collect::>(); +/// +/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty); +/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry); +/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1)); +/// +/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry); +/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1)); +/// ``` +#[must_use] +#[derive(PartialEq, Eq, Copy, Clone)] +pub enum Steal { + /// The queue was empty at the time of stealing. + Empty, + + /// At least one task was successfully stolen. + Success(T), + + /// The steal operation needs to be retried. + Retry, +} + +impl Steal { + /// Returns `true` if the queue was empty at the time of stealing. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Steal::{Empty, Retry, Success}; + /// + /// assert!(!Success(7).is_empty()); + /// assert!(!Retry::.is_empty()); + /// + /// assert!(Empty::.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + match self { + Steal::Empty => true, + _ => false, + } + } + + /// Returns `true` if at least one task was stolen. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Steal::{Empty, Retry, Success}; + /// + /// assert!(!Empty::.is_success()); + /// assert!(!Retry::.is_success()); + /// + /// assert!(Success(7).is_success()); + /// ``` + pub fn is_success(&self) -> bool { + match self { + Steal::Success(_) => true, + _ => false, + } + } + + /// Returns `true` if the steal operation needs to be retried. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Steal::{Empty, Retry, Success}; + /// + /// assert!(!Empty::.is_retry()); + /// assert!(!Success(7).is_retry()); + /// + /// assert!(Retry::.is_retry()); + /// ``` + pub fn is_retry(&self) -> bool { + match self { + Steal::Retry => true, + _ => false, + } + } + + /// Returns the result of the operation, if successful. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Steal::{Empty, Retry, Success}; + /// + /// assert_eq!(Empty::.success(), None); + /// assert_eq!(Retry::.success(), None); + /// + /// assert_eq!(Success(7).success(), Some(7)); + /// ``` + pub fn success(self) -> Option { + match self { + Steal::Success(res) => Some(res), + _ => None, + } + } + + /// If no task was stolen, attempts another steal operation. + /// + /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then: + /// + /// * If the second steal resulted in `Success`, it is returned. + /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned. + /// * If both resulted in `None`, then `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Steal::{Empty, Retry, Success}; + /// + /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1)); + /// assert_eq!(Retry.or_else(|| Success(2)), Success(2)); + /// + /// assert_eq!(Retry.or_else(|| Empty), Retry::); + /// assert_eq!(Empty.or_else(|| Retry), Retry::); + /// + /// assert_eq!(Empty.or_else(|| Empty), Empty::); + /// ``` + pub fn or_else(self, f: F) -> Steal + where + F: FnOnce() -> Steal, + { + match self { + Steal::Empty => f(), + Steal::Success(_) => self, + Steal::Retry => { + if let Steal::Success(res) = f() { + Steal::Success(res) + } else { + Steal::Retry + } + } + } + } +} + +impl fmt::Debug for Steal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Steal::Empty => f.pad("Empty"), + Steal::Success(_) => f.pad("Success(..)"), + Steal::Retry => f.pad("Retry"), + } + } +} + +impl FromIterator> for Steal { + /// Consumes items until a `Success` is found and returns it. + /// + /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`. + /// Otherwise, `Empty` is returned. + fn from_iter(iter: I) -> Steal + where + I: IntoIterator>, + { + let mut retry = false; + for s in iter { + match &s { + Steal::Empty => {} + Steal::Success(_) => return s, + Steal::Retry => retry = true, + } + } + + if retry { + Steal::Retry + } else { + Steal::Empty + } + } +} diff -Nru rust-crossbeam-deque-0.6.3/tests/fifo.rs rust-crossbeam-deque-0.7.1/tests/fifo.rs --- rust-crossbeam-deque-0.6.3/tests/fifo.rs 2018-12-11 10:44:51.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/tests/fifo.rs 2019-01-27 13:21:15.000000000 +0000 @@ -1,70 +1,103 @@ extern crate crossbeam_deque as deque; -extern crate crossbeam_epoch as epoch; +extern crate crossbeam_utils as utils; extern crate rand; use std::sync::atomic::Ordering::SeqCst; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; -use std::thread; -use deque::{Pop, Steal}; +use deque::Steal::{Empty, Success}; +use deque::Worker; use rand::Rng; +use utils::thread::scope; #[test] fn smoke() { - let (w, s) = deque::fifo::(); - assert_eq!(w.pop(), Pop::Empty); - assert_eq!(s.steal(), Steal::Empty); + let w = Worker::new_fifo(); + let s = w.stealer(); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); w.push(1); - assert_eq!(w.pop(), Pop::Data(1)); - assert_eq!(w.pop(), Pop::Empty); - assert_eq!(s.steal(), Steal::Empty); + assert_eq!(w.pop(), Some(1)); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); w.push(2); - assert_eq!(s.steal(), Steal::Data(2)); - assert_eq!(s.steal(), Steal::Empty); - assert_eq!(w.pop(), Pop::Empty); + assert_eq!(s.steal(), Success(2)); + assert_eq!(s.steal(), Empty); + assert_eq!(w.pop(), None); w.push(3); w.push(4); w.push(5); - assert_eq!(s.steal(), Steal::Data(3)); - assert_eq!(s.steal(), Steal::Data(4)); - assert_eq!(s.steal(), Steal::Data(5)); - assert_eq!(s.steal(), Steal::Empty); + assert_eq!(s.steal(), Success(3)); + assert_eq!(s.steal(), Success(4)); + assert_eq!(s.steal(), Success(5)); + assert_eq!(s.steal(), Empty); w.push(6); w.push(7); w.push(8); w.push(9); - assert_eq!(w.pop(), Pop::Data(6)); - assert_eq!(s.steal(), Steal::Data(7)); - assert_eq!(w.pop(), Pop::Data(8)); - assert_eq!(w.pop(), Pop::Data(9)); - assert_eq!(w.pop(), Pop::Empty); + assert_eq!(w.pop(), Some(6)); + assert_eq!(s.steal(), Success(7)); + assert_eq!(w.pop(), Some(8)); + assert_eq!(w.pop(), Some(9)); + assert_eq!(w.pop(), None); } #[test] -fn steal_push() { +fn is_empty() { + let w = Worker::new_fifo(); + let s = w.stealer(); + + assert!(w.is_empty()); + w.push(1); + assert!(!w.is_empty()); + w.push(2); + assert!(!w.is_empty()); + let _ = w.pop(); + assert!(!w.is_empty()); + let _ = w.pop(); + assert!(w.is_empty()); + + assert!(s.is_empty()); + w.push(1); + assert!(!s.is_empty()); + w.push(2); + assert!(!s.is_empty()); + let _ = s.steal(); + assert!(!s.is_empty()); + let _ = s.steal(); + assert!(s.is_empty()); +} + +#[test] +fn spsc() { const STEPS: usize = 50_000; - let (w, s) = deque::fifo(); - let t = thread::spawn(move || { - for i in 0..STEPS { - loop { - if let Steal::Data(v) = s.steal() { - assert_eq!(i, v); - break; + let w = Worker::new_fifo(); + let s = w.stealer(); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..STEPS { + loop { + if let Success(v) = s.steal() { + assert_eq!(i, v); + break; + } } } - } - }); - for i in 0..STEPS { - w.push(i); - } - t.join().unwrap(); + assert_eq!(s.steal(), Empty); + }); + + for i in 0..STEPS { + w.push(i); + } + }).unwrap(); } #[test] @@ -72,136 +105,97 @@ const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::fifo(); + let w = Worker::new_fifo(); for i in 0..COUNT { w.push(Box::new(i + 1)); } let remaining = Arc::new(AtomicUsize::new(COUNT)); - let threads = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let remaining = remaining.clone(); - thread::spawn(move || { + scope.spawn(move |_| { let mut last = 0; while remaining.load(SeqCst) > 0 { - if let Steal::Data(x) = s.steal() { + if let Success(x) = s.steal() { assert!(last < *x); last = *x; remaining.fetch_sub(1, SeqCst); } } - }) - }).collect::>(); + }); + } - let mut last = 0; - while remaining.load(SeqCst) > 0 { - loop { - match w.pop() { - Pop::Data(x) => { - assert!(last < *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - break; - } - Pop::Empty => break, - Pop::Retry => {} + let mut last = 0; + while remaining.load(SeqCst) > 0 { + if let Some(x) = w.pop() { + assert!(last < *x); + last = *x; + remaining.fetch_sub(1, SeqCst); } } - } - - for t in threads { - t.join().unwrap(); - } + }).unwrap(); } -fn run_stress() { +#[test] +fn stress() { const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::fifo(); + let w = Worker::new_fifo(); let done = Arc::new(AtomicBool::new(false)); let hits = Arc::new(AtomicUsize::new(0)); - let threads = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let done = done.clone(); let hits = hits.clone(); - thread::spawn(move || { - let (w2, _) = deque::fifo(); + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); while !done.load(SeqCst) { - if let Steal::Data(_) = s.steal() { + if let Success(_) = s.steal() { hits.fetch_add(1, SeqCst); } - if let Steal::Data(_) = s.steal_many(&w2) { - hits.fetch_add(1, SeqCst); + let _ = s.steal_batch(&w2); - loop { - match w2.pop() { - Pop::Data(_) => { - hits.fetch_add(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } + if let Success(_) = s.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); } - } - }) - }).collect::>(); - let mut rng = rand::thread_rng(); - let mut expected = 0; - while expected < COUNT { - if rng.gen_range(0, 3) == 0 { - loop { - match w.pop() { - Pop::Data(_) => { + while let Some(_) = w2.pop() { hits.fetch_add(1, SeqCst); } - Pop::Empty => break, - Pop::Retry => {} } - } - } else { - w.push(expected); - expected += 1; + }); } - } - while hits.load(SeqCst) < COUNT { - loop { - match w.pop() { - Pop::Data(_) => { + let mut rng = rand::thread_rng(); + let mut expected = 0; + while expected < COUNT { + if rng.gen_range(0, 3) == 0 { + while let Some(_) = w.pop() { hits.fetch_add(1, SeqCst); } - Pop::Empty => break, - Pop::Retry => {} + } else { + w.push(expected); + expected += 1; } } - } - done.store(true, SeqCst); - - for t in threads { - t.join().unwrap(); - } -} - -#[test] -fn stress() { - run_stress(); -} -#[test] -fn stress_pinned() { - let _guard = epoch::pin(); - run_stress(); + while hits.load(SeqCst) < COUNT { + while let Some(_) = w.pop() { + hits.fetch_add(1, SeqCst); + } + } + done.store(true, SeqCst); + }).unwrap(); } #[test] @@ -209,71 +203,57 @@ const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::fifo(); + let w = Worker::new_fifo(); let done = Arc::new(AtomicBool::new(false)); + let mut all_hits = Vec::new(); - let (threads, hits): (Vec<_>, Vec<_>) = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let done = done.clone(); let hits = Arc::new(AtomicUsize::new(0)); + all_hits.push(hits.clone()); - let t = { - let hits = hits.clone(); - thread::spawn(move || { - let (w2, _) = deque::fifo(); - - while !done.load(SeqCst) { - if let Steal::Data(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - if let Steal::Data(_) = s.steal_many(&w2) { - hits.fetch_add(1, SeqCst); - - loop { - match w2.pop() { - Pop::Data(_) => { - hits.fetch_add(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } - } - } - }) - }; - - (t, hits) - }).unzip(); - - let mut rng = rand::thread_rng(); - let mut my_hits = 0; - loop { - for i in 0..rng.gen_range(0, COUNT) { - if rng.gen_range(0, 3) == 0 && my_hits == 0 { - loop { - match w.pop() { - Pop::Data(_) => my_hits += 1, - Pop::Empty => break, - Pop::Retry => {} + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); + + while !done.load(SeqCst) { + if let Success(_) = s.steal() { + hits.fetch_add(1, SeqCst); + } + + let _ = s.steal_batch(&w2); + + if let Success(_) = s.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); + } + + while let Some(_) = w2.pop() { + hits.fetch_add(1, SeqCst); } } - } else { - w.push(i); - } + }); } - if my_hits > 0 && hits.iter().all(|h| h.load(SeqCst) > 0) { - break; - } - } - done.store(true, SeqCst); + let mut rng = rand::thread_rng(); + let mut my_hits = 0; + loop { + for i in 0..rng.gen_range(0, COUNT) { + if rng.gen_range(0, 3) == 0 && my_hits == 0 { + while let Some(_) = w.pop() { + my_hits += 1; + } + } else { + w.push(i); + } + } - for t in threads { - t.join().unwrap(); - } + if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { + break; + } + } + done.store(true, SeqCst); + }).unwrap(); } #[test] @@ -290,64 +270,50 @@ } } - let (w, s) = deque::fifo(); - + let w = Worker::new_fifo(); let dropped = Arc::new(Mutex::new(Vec::new())); let remaining = Arc::new(AtomicUsize::new(COUNT)); + for i in 0..COUNT { w.push(Elem(i, dropped.clone())); } - let threads = (0..THREADS) - .map(|_| { + scope(|scope| { + for _ in 0..THREADS { let remaining = remaining.clone(); - let s = s.clone(); + let s = w.stealer(); - thread::spawn(move || { - let (w2, _) = deque::fifo(); + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); let mut cnt = 0; while cnt < STEPS { - if let Steal::Data(_) = s.steal() { + if let Success(_) = s.steal() { cnt += 1; remaining.fetch_sub(1, SeqCst); } - if let Steal::Data(_) = s.steal_many(&w2) { + let _ = s.steal_batch(&w2); + + if let Success(_) = s.steal_batch_and_pop(&w2) { cnt += 1; remaining.fetch_sub(1, SeqCst); + } - loop { - match w2.pop() { - Pop::Data(_) => { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } + while let Some(_) = w2.pop() { + cnt += 1; + remaining.fetch_sub(1, SeqCst); } } - }) - }).collect::>(); + }); + } - for _ in 0..STEPS { - loop { - match w.pop() { - Pop::Data(_) => { - remaining.fetch_sub(1, SeqCst); - break; - } - Pop::Empty => break, - Pop::Retry => {} + for _ in 0..STEPS { + if let Some(_) = w.pop() { + remaining.fetch_sub(1, SeqCst); } } - } - - for t in threads { - t.join().unwrap(); - } + }).unwrap(); let rem = remaining.load(SeqCst); assert!(rem > 0); @@ -358,7 +324,7 @@ v.clear(); } - drop((w, s)); + drop(w); { let mut v = dropped.lock().unwrap(); diff -Nru rust-crossbeam-deque-0.6.3/tests/injector.rs rust-crossbeam-deque-0.7.1/tests/injector.rs --- rust-crossbeam-deque-0.6.3/tests/injector.rs 1970-01-01 00:00:00.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/tests/injector.rs 2019-01-27 13:21:15.000000000 +0000 @@ -0,0 +1,347 @@ +extern crate crossbeam_deque as deque; +extern crate crossbeam_utils as utils; +extern crate rand; + +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::{Arc, Mutex}; + +use deque::{Injector, Worker}; +use deque::Steal::{Empty, Success}; +use rand::Rng; +use utils::thread::scope; + +#[test] +fn smoke() { + let q = Injector::new(); + assert_eq!(q.steal(), Empty); + + q.push(1); + q.push(2); + assert_eq!(q.steal(), Success(1)); + assert_eq!(q.steal(), Success(2)); + assert_eq!(q.steal(), Empty); + + q.push(3); + assert_eq!(q.steal(), Success(3)); + assert_eq!(q.steal(), Empty); +} + +#[test] +fn is_empty() { + let q = Injector::new(); + assert!(q.is_empty()); + + q.push(1); + assert!(!q.is_empty()); + q.push(2); + assert!(!q.is_empty()); + + let _ = q.steal(); + assert!(!q.is_empty()); + let _ = q.steal(); + assert!(q.is_empty()); + + q.push(3); + assert!(!q.is_empty()); + let _ = q.steal(); + assert!(q.is_empty()); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let q = Injector::new(); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + loop { + if let Success(v) = q.steal() { + assert_eq!(i, v); + break; + } + } + } + + assert_eq!(q.steal(), Empty); + }); + + for i in 0..COUNT { + q.push(i); + } + }).unwrap(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let q = Injector::new(); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + q.push(i); + } + }); + } + + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + loop { + if let Success(n) = q.steal() { + v[n].fetch_add(1, SeqCst); + break; + } + } + } + }); + } + }).unwrap(); + + for c in v { + assert_eq!(c.load(SeqCst), THREADS); + } +} + +#[test] +fn stampede() { + const THREADS: usize = 8; + const COUNT: usize = 50_000; + + let q = Injector::new(); + + for i in 0..COUNT { + q.push(Box::new(i + 1)); + } + let remaining = Arc::new(AtomicUsize::new(COUNT)); + + scope(|scope| { + for _ in 0..THREADS { + let remaining = remaining.clone(); + let q = &q; + + scope.spawn(move |_| { + let mut last = 0; + while remaining.load(SeqCst) > 0 { + if let Success(x) = q.steal() { + assert!(last < *x); + last = *x; + remaining.fetch_sub(1, SeqCst); + } + } + }); + } + + let mut last = 0; + while remaining.load(SeqCst) > 0 { + if let Success(x) = q.steal() { + assert!(last < *x); + last = *x; + remaining.fetch_sub(1, SeqCst); + } + } + }).unwrap(); +} + +#[test] +fn stress() { + const THREADS: usize = 8; + const COUNT: usize = 50_000; + + let q = Injector::new(); + let done = Arc::new(AtomicBool::new(false)); + let hits = Arc::new(AtomicUsize::new(0)); + + scope(|scope| { + for _ in 0..THREADS { + let done = done.clone(); + let hits = hits.clone(); + let q = &q; + + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); + + while !done.load(SeqCst) { + if let Success(_) = q.steal() { + hits.fetch_add(1, SeqCst); + } + + let _ = q.steal_batch(&w2); + + if let Success(_) = q.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); + } + + while let Some(_) = w2.pop() { + hits.fetch_add(1, SeqCst); + } + } + }); + } + + let mut rng = rand::thread_rng(); + let mut expected = 0; + while expected < COUNT { + if rng.gen_range(0, 3) == 0 { + while let Success(_) = q.steal() { + hits.fetch_add(1, SeqCst); + } + } else { + q.push(expected); + expected += 1; + } + } + + while hits.load(SeqCst) < COUNT { + while let Success(_) = q.steal() { + hits.fetch_add(1, SeqCst); + } + } + done.store(true, SeqCst); + }).unwrap(); +} + +#[test] +fn no_starvation() { + const THREADS: usize = 8; + const COUNT: usize = 50_000; + + let q = Injector::new(); + let done = Arc::new(AtomicBool::new(false)); + let mut all_hits = Vec::new(); + + scope(|scope| { + for _ in 0..THREADS { + let done = done.clone(); + let hits = Arc::new(AtomicUsize::new(0)); + all_hits.push(hits.clone()); + let q = &q; + + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); + + while !done.load(SeqCst) { + if let Success(_) = q.steal() { + hits.fetch_add(1, SeqCst); + } + + let _ = q.steal_batch(&w2); + + if let Success(_) = q.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); + } + + while let Some(_) = w2.pop() { + hits.fetch_add(1, SeqCst); + } + } + }); + } + + let mut rng = rand::thread_rng(); + let mut my_hits = 0; + loop { + for i in 0..rng.gen_range(0, COUNT) { + if rng.gen_range(0, 3) == 0 && my_hits == 0 { + while let Success(_) = q.steal() { + my_hits += 1; + } + } else { + q.push(i); + } + } + + if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { + break; + } + } + done.store(true, SeqCst); + }).unwrap(); +} + +#[test] +fn destructors() { + const THREADS: usize = 8; + const COUNT: usize = 50_000; + const STEPS: usize = 1000; + + struct Elem(usize, Arc>>); + + impl Drop for Elem { + fn drop(&mut self) { + self.1.lock().unwrap().push(self.0); + } + } + + let q = Injector::new(); + let dropped = Arc::new(Mutex::new(Vec::new())); + let remaining = Arc::new(AtomicUsize::new(COUNT)); + + for i in 0..COUNT { + q.push(Elem(i, dropped.clone())); + } + + scope(|scope| { + for _ in 0..THREADS { + let remaining = remaining.clone(); + let q = &q; + + scope.spawn(move |_| { + let w2 = Worker::new_fifo(); + let mut cnt = 0; + + while cnt < STEPS { + if let Success(_) = q.steal() { + cnt += 1; + remaining.fetch_sub(1, SeqCst); + } + + let _ = q.steal_batch(&w2); + + if let Success(_) = q.steal_batch_and_pop(&w2) { + cnt += 1; + remaining.fetch_sub(1, SeqCst); + } + + while let Some(_) = w2.pop() { + cnt += 1; + remaining.fetch_sub(1, SeqCst); + } + } + }); + } + + for _ in 0..STEPS { + if let Success(_) = q.steal() { + remaining.fetch_sub(1, SeqCst); + } + } + }).unwrap(); + + let rem = remaining.load(SeqCst); + assert!(rem > 0); + + { + let mut v = dropped.lock().unwrap(); + assert_eq!(v.len(), COUNT - rem); + v.clear(); + } + + drop(q); + + { + let mut v = dropped.lock().unwrap(); + assert_eq!(v.len(), rem); + v.sort(); + for pair in v.windows(2) { + assert_eq!(pair[0] + 1, pair[1]); + } + } +} diff -Nru rust-crossbeam-deque-0.6.3/tests/lifo.rs rust-crossbeam-deque-0.7.1/tests/lifo.rs --- rust-crossbeam-deque-0.6.3/tests/lifo.rs 2018-12-11 10:44:51.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/tests/lifo.rs 2019-01-27 13:21:15.000000000 +0000 @@ -1,70 +1,103 @@ extern crate crossbeam_deque as deque; -extern crate crossbeam_epoch as epoch; +extern crate crossbeam_utils as utils; extern crate rand; use std::sync::atomic::Ordering::SeqCst; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; -use std::thread; -use deque::{Pop, Steal}; +use deque::Steal::{Empty, Success}; +use deque::Worker; use rand::Rng; +use utils::thread::scope; #[test] fn smoke() { - let (w, s) = deque::lifo::(); - assert_eq!(w.pop(), Pop::Empty); - assert_eq!(s.steal(), Steal::Empty); + let w = Worker::new_lifo(); + let s = w.stealer(); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); w.push(1); - assert_eq!(w.pop(), Pop::Data(1)); - assert_eq!(w.pop(), Pop::Empty); - assert_eq!(s.steal(), Steal::Empty); + assert_eq!(w.pop(), Some(1)); + assert_eq!(w.pop(), None); + assert_eq!(s.steal(), Empty); w.push(2); - assert_eq!(s.steal(), Steal::Data(2)); - assert_eq!(s.steal(), Steal::Empty); - assert_eq!(w.pop(), Pop::Empty); + assert_eq!(s.steal(), Success(2)); + assert_eq!(s.steal(), Empty); + assert_eq!(w.pop(), None); w.push(3); w.push(4); w.push(5); - assert_eq!(s.steal(), Steal::Data(3)); - assert_eq!(s.steal(), Steal::Data(4)); - assert_eq!(s.steal(), Steal::Data(5)); - assert_eq!(s.steal(), Steal::Empty); + assert_eq!(s.steal(), Success(3)); + assert_eq!(s.steal(), Success(4)); + assert_eq!(s.steal(), Success(5)); + assert_eq!(s.steal(), Empty); w.push(6); w.push(7); w.push(8); w.push(9); - assert_eq!(w.pop(), Pop::Data(9)); - assert_eq!(s.steal(), Steal::Data(6)); - assert_eq!(w.pop(), Pop::Data(8)); - assert_eq!(w.pop(), Pop::Data(7)); - assert_eq!(w.pop(), Pop::Empty); + assert_eq!(w.pop(), Some(9)); + assert_eq!(s.steal(), Success(6)); + assert_eq!(w.pop(), Some(8)); + assert_eq!(w.pop(), Some(7)); + assert_eq!(w.pop(), None); } #[test] -fn steal_push() { +fn is_empty() { + let w = Worker::new_lifo(); + let s = w.stealer(); + + assert!(w.is_empty()); + w.push(1); + assert!(!w.is_empty()); + w.push(2); + assert!(!w.is_empty()); + let _ = w.pop(); + assert!(!w.is_empty()); + let _ = w.pop(); + assert!(w.is_empty()); + + assert!(s.is_empty()); + w.push(1); + assert!(!s.is_empty()); + w.push(2); + assert!(!s.is_empty()); + let _ = s.steal(); + assert!(!s.is_empty()); + let _ = s.steal(); + assert!(s.is_empty()); +} + +#[test] +fn spsc() { const STEPS: usize = 50_000; - let (w, s) = deque::lifo(); - let t = thread::spawn(move || { - for i in 0..STEPS { - loop { - if let Steal::Data(v) = s.steal() { - assert_eq!(i, v); - break; + let w = Worker::new_lifo(); + let s = w.stealer(); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..STEPS { + loop { + if let Success(v) = s.steal() { + assert_eq!(i, v); + break; + } } } - } - }); - for i in 0..STEPS { - w.push(i); - } - t.join().unwrap(); + assert_eq!(s.steal(), Empty); + }); + + for i in 0..STEPS { + w.push(i); + } + }).unwrap(); } #[test] @@ -72,136 +105,97 @@ const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::lifo(); + let w = Worker::new_lifo(); for i in 0..COUNT { w.push(Box::new(i + 1)); } let remaining = Arc::new(AtomicUsize::new(COUNT)); - let threads = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let remaining = remaining.clone(); - thread::spawn(move || { + scope.spawn(move |_| { let mut last = 0; while remaining.load(SeqCst) > 0 { - if let Steal::Data(x) = s.steal() { + if let Success(x) = s.steal() { assert!(last < *x); last = *x; remaining.fetch_sub(1, SeqCst); } } - }) - }).collect::>(); + }); + } - let mut last = COUNT + 1; - while remaining.load(SeqCst) > 0 { - loop { - match w.pop() { - Pop::Data(x) => { - assert!(last > *x); - last = *x; - remaining.fetch_sub(1, SeqCst); - break; - } - Pop::Empty => break, - Pop::Retry => {} + let mut last = COUNT + 1; + while remaining.load(SeqCst) > 0 { + if let Some(x) = w.pop() { + assert!(last > *x); + last = *x; + remaining.fetch_sub(1, SeqCst); } } - } - - for t in threads { - t.join().unwrap(); - } + }).unwrap(); } -fn run_stress() { +#[test] +fn stress() { const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::lifo(); + let w = Worker::new_lifo(); let done = Arc::new(AtomicBool::new(false)); let hits = Arc::new(AtomicUsize::new(0)); - let threads = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let done = done.clone(); let hits = hits.clone(); - thread::spawn(move || { - let (w2, _) = deque::lifo(); + scope.spawn(move |_| { + let w2 = Worker::new_lifo(); while !done.load(SeqCst) { - if let Steal::Data(_) = s.steal() { + if let Success(_) = s.steal() { hits.fetch_add(1, SeqCst); } - if let Steal::Data(_) = s.steal_many(&w2) { - hits.fetch_add(1, SeqCst); + let _ = s.steal_batch(&w2); - loop { - match w2.pop() { - Pop::Data(_) => { - hits.fetch_add(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } + if let Success(_) = s.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); } - } - }) - }).collect::>(); - let mut rng = rand::thread_rng(); - let mut expected = 0; - while expected < COUNT { - if rng.gen_range(0, 3) == 0 { - loop { - match w.pop() { - Pop::Data(_) => { + while let Some(_) = w2.pop() { hits.fetch_add(1, SeqCst); } - Pop::Empty => break, - Pop::Retry => {} } - } - } else { - w.push(expected); - expected += 1; + }); } - } - while hits.load(SeqCst) < COUNT { - loop { - match w.pop() { - Pop::Data(_) => { + let mut rng = rand::thread_rng(); + let mut expected = 0; + while expected < COUNT { + if rng.gen_range(0, 3) == 0 { + while let Some(_) = w.pop() { hits.fetch_add(1, SeqCst); } - Pop::Empty => break, - Pop::Retry => {} + } else { + w.push(expected); + expected += 1; } } - } - done.store(true, SeqCst); - - for t in threads { - t.join().unwrap(); - } -} - -#[test] -fn stress() { - run_stress(); -} -#[test] -fn stress_pinned() { - let _guard = epoch::pin(); - run_stress(); + while hits.load(SeqCst) < COUNT { + while let Some(_) = w.pop() { + hits.fetch_add(1, SeqCst); + } + } + done.store(true, SeqCst); + }).unwrap(); } #[test] @@ -209,71 +203,57 @@ const THREADS: usize = 8; const COUNT: usize = 50_000; - let (w, s) = deque::lifo(); + let w = Worker::new_lifo(); let done = Arc::new(AtomicBool::new(false)); + let mut all_hits = Vec::new(); - let (threads, hits): (Vec<_>, Vec<_>) = (0..THREADS) - .map(|_| { - let s = s.clone(); + scope(|scope| { + for _ in 0..THREADS { + let s = w.stealer(); let done = done.clone(); let hits = Arc::new(AtomicUsize::new(0)); + all_hits.push(hits.clone()); - let t = { - let hits = hits.clone(); - thread::spawn(move || { - let (w2, _) = deque::lifo(); - - while !done.load(SeqCst) { - if let Steal::Data(_) = s.steal() { - hits.fetch_add(1, SeqCst); - } - - if let Steal::Data(_) = s.steal_many(&w2) { - hits.fetch_add(1, SeqCst); - - loop { - match w2.pop() { - Pop::Data(_) => { - hits.fetch_add(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } - } - } - }) - }; - - (t, hits) - }).unzip(); - - let mut rng = rand::thread_rng(); - let mut my_hits = 0; - loop { - for i in 0..rng.gen_range(0, COUNT) { - if rng.gen_range(0, 3) == 0 && my_hits == 0 { - loop { - match w.pop() { - Pop::Data(_) => my_hits += 1, - Pop::Empty => break, - Pop::Retry => {} + scope.spawn(move |_| { + let w2 = Worker::new_lifo(); + + while !done.load(SeqCst) { + if let Success(_) = s.steal() { + hits.fetch_add(1, SeqCst); + } + + let _ = s.steal_batch(&w2); + + if let Success(_) = s.steal_batch_and_pop(&w2) { + hits.fetch_add(1, SeqCst); + } + + while let Some(_) = w2.pop() { + hits.fetch_add(1, SeqCst); } } - } else { - w.push(i); - } + }); } - if my_hits > 0 && hits.iter().all(|h| h.load(SeqCst) > 0) { - break; - } - } - done.store(true, SeqCst); + let mut rng = rand::thread_rng(); + let mut my_hits = 0; + loop { + for i in 0..rng.gen_range(0, COUNT) { + if rng.gen_range(0, 3) == 0 && my_hits == 0 { + while let Some(_) = w.pop() { + my_hits += 1; + } + } else { + w.push(i); + } + } - for t in threads { - t.join().unwrap(); - } + if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { + break; + } + } + done.store(true, SeqCst); + }).unwrap(); } #[test] @@ -290,64 +270,50 @@ } } - let (w, s) = deque::lifo(); - + let w = Worker::new_lifo(); let dropped = Arc::new(Mutex::new(Vec::new())); let remaining = Arc::new(AtomicUsize::new(COUNT)); + for i in 0..COUNT { w.push(Elem(i, dropped.clone())); } - let threads = (0..THREADS) - .map(|_| { + scope(|scope| { + for _ in 0..THREADS { let remaining = remaining.clone(); - let s = s.clone(); + let s = w.stealer(); - thread::spawn(move || { - let (w2, _) = deque::lifo(); + scope.spawn(move |_| { + let w2 = Worker::new_lifo(); let mut cnt = 0; while cnt < STEPS { - if let Steal::Data(_) = s.steal() { + if let Success(_) = s.steal() { cnt += 1; remaining.fetch_sub(1, SeqCst); } - if let Steal::Data(_) = s.steal_many(&w2) { + let _ = s.steal_batch(&w2); + + if let Success(_) = s.steal_batch_and_pop(&w2) { cnt += 1; remaining.fetch_sub(1, SeqCst); + } - loop { - match w2.pop() { - Pop::Data(_) => { - cnt += 1; - remaining.fetch_sub(1, SeqCst); - } - Pop::Empty => break, - Pop::Retry => {} - } - } + while let Some(_) = w2.pop() { + cnt += 1; + remaining.fetch_sub(1, SeqCst); } } - }) - }).collect::>(); + }); + } - for _ in 0..STEPS { - loop { - match w.pop() { - Pop::Data(_) => { - remaining.fetch_sub(1, SeqCst); - break; - } - Pop::Empty => break, - Pop::Retry => {} + for _ in 0..STEPS { + if let Some(_) = w.pop() { + remaining.fetch_sub(1, SeqCst); } } - } - - for t in threads { - t.join().unwrap(); - } + }).unwrap(); let rem = remaining.load(SeqCst); assert!(rem > 0); @@ -358,7 +324,7 @@ v.clear(); } - drop((w, s)); + drop(w); { let mut v = dropped.lock().unwrap(); diff -Nru rust-crossbeam-deque-0.6.3/tests/steal.rs rust-crossbeam-deque-0.7.1/tests/steal.rs --- rust-crossbeam-deque-0.6.3/tests/steal.rs 1970-01-01 00:00:00.000000000 +0000 +++ rust-crossbeam-deque-0.7.1/tests/steal.rs 2019-01-27 13:21:15.000000000 +0000 @@ -0,0 +1,214 @@ +extern crate crossbeam_deque as deque; + +use deque::Steal::Success; +use deque::{Injector, Worker}; + +#[test] +fn steal_fifo() { + let w = Worker::new_fifo(); + for i in 1..=3 { + w.push(i); + } + + let s = w.stealer(); + assert_eq!(s.steal(), Success(1)); + assert_eq!(s.steal(), Success(2)); + assert_eq!(s.steal(), Success(3)); +} + +#[test] +fn steal_lifo() { + let w = Worker::new_lifo(); + for i in 1..=3 { + w.push(i); + } + + let s = w.stealer(); + assert_eq!(s.steal(), Success(1)); + assert_eq!(s.steal(), Success(2)); + assert_eq!(s.steal(), Success(3)); +} + +#[test] +fn steal_injector() { + let q = Injector::new(); + for i in 1..=3 { + q.push(i); + } + + assert_eq!(q.steal(), Success(1)); + assert_eq!(q.steal(), Success(2)); + assert_eq!(q.steal(), Success(3)); +} + +#[test] +fn steal_batch_fifo_fifo() { + let w = Worker::new_fifo(); + for i in 1..=4 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_fifo(); + + assert_eq!(s.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(1)); + assert_eq!(w2.pop(), Some(2)); +} + +#[test] +fn steal_batch_lifo_lifo() { + let w = Worker::new_lifo(); + for i in 1..=4 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_lifo(); + + assert_eq!(s.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(1)); +} + +#[test] +fn steal_batch_fifo_lifo() { + let w = Worker::new_fifo(); + for i in 1..=4 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_lifo(); + + assert_eq!(s.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(1)); + assert_eq!(w2.pop(), Some(2)); +} + +#[test] +fn steal_batch_lifo_fifo() { + let w = Worker::new_lifo(); + for i in 1..=4 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_fifo(); + + assert_eq!(s.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(1)); +} + +#[test] +fn steal_batch_injector_fifo() { + let q = Injector::new(); + for i in 1..=4 { + q.push(i); + } + + let w2 = Worker::new_fifo(); + assert_eq!(q.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(1)); + assert_eq!(w2.pop(), Some(2)); +} + +#[test] +fn steal_batch_injector_lifo() { + let q = Injector::new(); + for i in 1..=4 { + q.push(i); + } + + let w2 = Worker::new_lifo(); + assert_eq!(q.steal_batch(&w2), Success(())); + assert_eq!(w2.pop(), Some(1)); + assert_eq!(w2.pop(), Some(2)); +} + +#[test] +fn steal_batch_and_pop_fifo_fifo() { + let w = Worker::new_fifo(); + for i in 1..=6 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_fifo(); + + assert_eq!(s.steal_batch_and_pop(&w2), Success(1)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(3)); +} + +#[test] +fn steal_batch_and_pop_lifo_lifo() { + let w = Worker::new_lifo(); + for i in 1..=6 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_lifo(); + + assert_eq!(s.steal_batch_and_pop(&w2), Success(3)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(1)); +} + +#[test] +fn steal_batch_and_pop_fifo_lifo() { + let w = Worker::new_fifo(); + for i in 1..=6 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_lifo(); + + assert_eq!(s.steal_batch_and_pop(&w2), Success(1)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(3)); +} + +#[test] +fn steal_batch_and_pop_lifo_fifo() { + let w = Worker::new_lifo(); + for i in 1..=6 { + w.push(i); + } + + let s = w.stealer(); + let w2 = Worker::new_fifo(); + + assert_eq!(s.steal_batch_and_pop(&w2), Success(3)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(1)); +} + +#[test] +fn steal_batch_and_pop_injector_fifo() { + let q = Injector::new(); + for i in 1..=6 { + q.push(i); + } + + let w2 = Worker::new_fifo(); + assert_eq!(q.steal_batch_and_pop(&w2), Success(1)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(3)); +} + +#[test] +fn steal_batch_and_pop_injector_lifo() { + let q = Injector::new(); + for i in 1..=6 { + q.push(i); + } + + let w2 = Worker::new_lifo(); + assert_eq!(q.steal_batch_and_pop(&w2), Success(1)); + assert_eq!(w2.pop(), Some(2)); + assert_eq!(w2.pop(), Some(3)); +}