Async Rust with Tokio Part 4: Tasks, Spawning, and Structured Concurrency

Async Rust with Tokio Part 4: Tasks, Spawning, and Structured Concurrency

A future by itself runs sequentially. To get real concurrency – multiple things happening at the same time – you need tasks. A task is an independently scheduled unit of work. Tokio runs tasks concurrently on its worker thread pool, switching between them at await points. This post covers how to spawn tasks, manage their lifetimes, handle results, and cancel them cleanly.

tokio::spawn: Creating Independent Tasks

use tokio::task::JoinHandle;

#[tokio::main]
async fn main() {
    // Spawn a task - runs concurrently with main
    let handle: JoinHandle = tokio::spawn(async {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        42
    });

    // Do other work while the task runs
    println!("Task spawned, doing other work");

    // Await the result
    match handle.await {
        Ok(value) => println!("Task returned: {}", value),
        Err(e) if e.is_cancelled() => println!("Task was cancelled"),
        Err(e) => println!("Task panicked: {:?}", e),
    }
}

tokio::spawn requires the future to be Send + 'static. Send because tasks can migrate between worker threads at await points. 'static because the task may outlive the spawning scope – Tokio cannot guarantee the task finishes before the spawner returns.

JoinHandle and Task Results

JoinHandle<T> is a future that resolves to Result<T, JoinError>. The Ok variant contains the task’s return value. The Err variant covers two cases: the task panicked, or the task was cancelled via abort().

use tokio::task::JoinHandle;
use std::time::Duration;

async fn spawn_multiple() {
    // Spawn several tasks
    let handles: Vec> = (0..5)
        .map(|i| {
            tokio::spawn(async move {
                tokio::time::sleep(Duration::from_millis(i * 10)).await;
                format!("task-{}", i)
            })
        })
        .collect();

    // Wait for all, collect results
    let mut results = Vec::new();
    for handle in handles {
        match handle.await {
            Ok(result) => results.push(result),
            Err(e) => eprintln!("Task failed: {:?}", e),
        }
    }
    println!("{:?}", results);
}

// Using join_all for cleaner multi-task waiting
use tokio::task::JoinSet;

async fn spawn_with_joinset() {
    let mut set = JoinSet::new();

    for i in 0..5 {
        set.spawn(async move {
            tokio::time::sleep(Duration::from_millis(i * 10)).await;
            format!("task-{}", i)
        });
    }

    // Process results as they complete (not in spawn order)
    while let Some(result) = set.join_next().await {
        println!("{:?}", result);
    }
}

JoinSet is the modern way to manage a collection of tasks. It handles cleanup automatically – when the JoinSet is dropped, all remaining tasks are aborted. This is structured concurrency: the task collection is scoped to the lifetime of the JoinSet.

Task Cancellation

Cancellation in async Rust works by dropping the future. When a future is dropped, it stops executing. All local variables are dropped in the normal Rust way. This makes cancellation memory-safe but requires careful design for cleanup.

use tokio::time::{sleep, Duration};

async fn cancellable_task() {
    // Using select! to race a task against a timeout
    tokio::select! {
        result = long_running_operation() => {
            println!("Completed: {:?}", result);
        }
        _ = sleep(Duration::from_secs(5)) => {
            println!("Timed out - long_running_operation future dropped here");
            // The operation's future is dropped, all its locals cleaned up
        }
    }
}

async fn long_running_operation() -> String {
    // Resource acquired - will be cleaned up if dropped
    let _guard = ResourceGuard::new(); // Drop impl runs on cancellation
    sleep(Duration::from_secs(10)).await;
    String::from("done")
}

struct ResourceGuard;
impl ResourceGuard {
    fn new() -> Self { println!("Resource acquired"); ResourceGuard }
}
impl Drop for ResourceGuard {
    fn drop(&mut self) { println!("Resource released"); }
}

// Explicit abort via JoinHandle
async fn explicit_abort() {
    let handle = tokio::spawn(async {
        sleep(Duration::from_secs(100)).await;
        "never reached"
    });

    sleep(Duration::from_millis(10)).await;
    handle.abort(); // sends cancellation signal

    match handle.await {
        Err(e) if e.is_cancelled() => println!("Task was aborted"),
        _ => {}
    }
}

Structured Concurrency With JoinSet

flowchart TD
    A[Parent Task] --> B[JoinSet::new]
    B --> C[spawn task 1]
    B --> D[spawn task 2]
    B --> E[spawn task 3]
    C --> F{join_next}
    D --> F
    E --> F
    F -->|result available| G[Process result]
    G --> F
    F -->|all done| H[JoinSet empty]
    H --> I[Parent continues]

    subgraph Cleanup ["On Drop / Scope Exit"]
        J[JoinSet dropped] --> K[All remaining tasks aborted]
    end

Spawning Tasks That Need Shared State

Because spawned tasks must be 'static, they cannot borrow from the spawning scope. Shared state must be moved in, typically via Arc:

use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Default)]
struct SharedState {
    counter: u64,
    results: Vec,
}

async fn spawn_with_shared_state() {
    let state = Arc::new(Mutex::new(SharedState::default()));

    let mut handles = vec![];
    for i in 0..10 {
        let state = Arc::clone(&state);
        handles.push(tokio::spawn(async move {
            // Simulate work
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;

            // Update shared state
            let mut lock = state.lock().await;
            lock.counter += 1;
            lock.results.push(format!("result-{}", i));
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }

    let final_state = state.lock().await;
    println!("Counter: {}", final_state.counter);
    println!("Results: {:?}", final_state.results);
}

tokio::spawn_local: Non-Send Tasks

For single-threaded contexts or when you need non-Send types in tasks, use tokio::task::spawn_local with a LocalSet:

use tokio::task::{LocalSet, spawn_local};
use std::rc::Rc; // Rc is not Send

async fn local_tasks() {
    let local = LocalSet::new();

    local.run_until(async {
        let data = Rc::new(vec![1, 2, 3]); // Rc is not Send - fine in LocalSet

        spawn_local({
            let data = Rc::clone(&data);
            async move {
                println!("{:?}", data);
            }
        }).await.unwrap();
    }).await;
}
// All tasks in a LocalSet run on a single thread
// They interleave cooperatively but never run in parallel

Task Priorities and Yielding

Tokio does not have built-in task priorities. All tasks compete equally for worker thread time. But you can voluntarily yield from a long-running task to give other tasks a chance to run:

async fn cpu_intensive_async() {
    for chunk in large_dataset.chunks(1000) {
        process_chunk(chunk); // CPU work, no await

        // Yield to let other tasks run between chunks
        tokio::task::yield_now().await;
    }
}

// yield_now() immediately returns Poll::Pending once,
// then the task is re-queued and polled again on the next scheduler turn.
// Use this to prevent a CPU-heavy async task from starving others.

Common Spawning Mistakes

Mistake 1: Dropping JoinHandle Without Awaiting

// WRONG: dropping the handle does NOT cancel the task
async fn leak_task() {
    let _handle = tokio::spawn(async {
        // This task runs until completion even if _handle is dropped
        do_work().await;
    });
    // _handle dropped here but task keeps running - it is "detached"
}

// CORRECT: if you want fire-and-forget, be intentional about it
async fn intentional_detach() {
    tokio::spawn(async {
        do_work().await;
    });
    // Handle dropped immediately - task is detached by design
}

// CORRECT: if you want to cancel on drop, use abort_handle or JoinSet
async fn cancel_on_drop() {
    let mut set = JoinSet::new();
    set.spawn(async { do_work().await });
    // JoinSet dropped here -> task aborted automatically
}

Mistake 2: Spawning Inside a Loop Without Backpressure

// DANGEROUS: spawns unbounded tasks, can exhaust memory
async fn unbounded_spawn(items: Vec) {
    for item in items {
        tokio::spawn(async move { process(item).await });
    }
}

// CORRECT: limit concurrency with a semaphore
use tokio::sync::Semaphore;
use std::sync::Arc;

async fn bounded_spawn(items: Vec) {
    let semaphore = Arc::new(Semaphore::new(100)); // max 100 concurrent tasks

    let mut handles = vec![];
    for item in items {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        handles.push(tokio::spawn(async move {
            let _permit = permit; // dropped when task finishes, releasing slot
            process(item).await
        }));
    }

    for h in handles { h.await.unwrap(); }
}

What Comes Next

Tasks communicate through channels. Part 5 covers Tokio’s channel types – mpsc, oneshot, broadcast, and watch – and the patterns that determine which one to use for a given communication need.

References

Written by:

647 Posts

View All Posts
Follow Me :