Async Rust with Tokio Part 7: Async Error Handling and Cancellation Safety

Async Rust with Tokio Part 7: Async Error Handling and Cancellation Safety

Error handling in async Rust follows the same Result-based approach as synchronous Rust. But cancellation is different. When a future is dropped – whether by select!, timeout, or abort() – it stops executing immediately at the next await point. Understanding this behavior is essential for writing async code that does not lose data or corrupt state when cancelled.

Error Propagation in Async Functions

use anyhow::Result;

// Error propagation works identically to sync Rust
async fn fetch_and_process(url: &str) -> Result {
    let response = reqwest::get(url).await?;       // ? works in async
    let text = response.text().await?;
    let processed = process(&text)?;               // sync ? also works
    Ok(processed)
}

// Combining multiple fallible async operations
async fn pipeline() -> Result<()> {
    let data = fetch_data().await?;
    let validated = validate(data)?;
    let stored = store(validated).await?;
    notify(stored).await?;
    Ok(())
}

// Handling errors from spawned tasks
async fn spawn_with_error_handling() -> Result<()> {
    let handle = tokio::spawn(async {
        fetch_and_process("https://example.com").await
    });

    match handle.await {
        Ok(Ok(result)) => println!("Success: {}", result),
        Ok(Err(app_err)) => eprintln!("Application error: {:?}", app_err),
        Err(join_err) => eprintln!("Task panicked or was cancelled: {:?}", join_err),
    }

    Ok(())
}

What Cancellation Means in Async Rust

In async Rust, cancellation means dropping the future. When you drop a future, it stops at the next await point it has not yet reached. All local variables in that future are dropped in the normal Rust order. This is deterministic and memory-safe – but it can interrupt multi-step operations partway through.

async fn risky_operation() {
    let mut connection = db_connect().await;       // step 1: connects
    connection.begin_transaction().await;          // step 2: starts transaction
    let data = fetch_data().await;                 // step 3: fetches

    // IF CANCELLED HERE (future dropped), the transaction is open but
    // no rollback has been called. The connection's Drop impl may or
    // may not handle this depending on the library.

    connection.insert(data).await;                 // step 4
    connection.commit().await;                     // step 5
}

// The safe version: use RAII for cleanup
struct Transaction {
    inner: DbConnection,
    committed: bool,
}

impl Drop for Transaction {
    fn drop(&mut self) {
        if !self.committed {
            // Best effort rollback - note: can't await in Drop
            // Libraries typically use a blocking call or fire a background task
            self.inner.rollback_sync();
        }
    }
}

Cancellation Safety

A future is cancellation-safe if dropping it at any await point leaves all external state consistent. Tokio’s documentation marks many futures with their cancellation safety status. This matters most when using select!.

use tokio::sync::mpsc;

// CANCELLATION SAFE: recv() is safe to cancel
// If dropped before a message arrives, no message is lost
async fn safe_example(mut rx: mpsc::Receiver) {
    tokio::select! {
        msg = rx.recv() => {
            // If this branch is not selected, no message is consumed
            println!("{:?}", msg);
        }
        _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
            println!("timeout");
        }
    }
}

// NOT CANCELLATION SAFE: reading from AsyncRead in parts
use tokio::io::AsyncReadExt;

async fn unsafe_read(mut reader: impl tokio::io::AsyncRead + Unpin) {
    let mut buf = [0u8; 1024];
    // If this future is cancelled after reading some bytes but before
    // returning, those bytes are consumed from the stream but never processed
    let n = reader.read(&mut buf).await.unwrap();
    process(&buf[..n]);
}

// SAFE version: buffer the read separately
struct SafeReader {
    reader: Box,
    buffer: Option>,
}

impl SafeReader {
    async fn read_line(&mut self) -> Option> {
        // Store partially read data in self.buffer
        // Can be safely dropped and resumed
        todo!()
    }
}

The select! Macro in Depth

flowchart TD
    A["tokio::select! starts"] --> B[Poll all branches simultaneously]
    B --> C{Any branch ready?}
    C -->|No| D[Return Pending\nregister all wakers]
    D --> E[Runtime wakes task\nwhen any branch ready]
    E --> B
    C -->|One ready| F[Execute that branch's handler]
    F --> G[Drop all OTHER branch futures]
    G --> H[select! completes]
    C -->|Multiple ready| I[Pick one pseudorandomly\nfor fairness]
    I --> F
use tokio::select;
use tokio::time::{sleep, Duration};

// Basic select - races two futures
async fn basic_select() {
    select! {
        val = async_op_1() => println!("op1 won: {}", val),
        val = async_op_2() => println!("op2 won: {}", val),
    }
}

// select! in a loop - common pattern for event loops
async fn event_loop(
    mut work_rx: mpsc::Receiver,
    mut shutdown_rx: watch::Receiver,
) {
    loop {
        select! {
            // biased: makes the first matching branch deterministically
            // win instead of pseudorandom selection.
            // Use when one branch should have priority.
            biased;

            _ = shutdown_rx.changed() => {
                if *shutdown_rx.borrow() { break; }
            }
            Some(work) = work_rx.recv() => {
                process_work(work).await;
            }
        }
    }
}

// select! with preconditions - disable branches conditionally
async fn conditional_select(condition: bool) {
    select! {
        val = expensive_op(), if condition => {
            // Only races this branch if condition is true
            println!("conditional branch: {}", val);
        }
        _ = sleep(Duration::from_secs(1)) => {
            println!("timeout branch always active");
        }
    }
}

// Reusing futures across select! iterations - must be pinned
async fn pinned_future_reuse() {
    let long_op = tokio::pin!(long_running());

    loop {
        select! {
            result = &mut long_op => {
                println!("completed: {}", result);
                break;
            }
            _ = sleep(Duration::from_millis(100)) => {
                println!("still waiting...");
                // long_op NOT dropped - continues from where it was
            }
        }
    }
}

The Drop Contract

When select! completes one branch, the futures in all other branches are dropped. This is the key behavior to reason about. If those other futures held resources or were mid-operation, their cleanup happens via Drop.

use std::sync::Arc;
use tokio::sync::Semaphore;

struct PermitGuard {
    _permit: tokio::sync::OwnedSemaphorePermit,
    operation: &'static str,
}

impl Drop for PermitGuard {
    fn drop(&mut self) {
        // Permit returned to semaphore automatically when guard drops
        tracing::debug!("Permit released for {}", self.operation);
    }
}

async fn guarded_operation(semaphore: Arc) -> PermitGuard {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
    PermitGuard { _permit: permit, operation: "db-write" }
}

async fn race_with_cleanup(semaphore: Arc) {
    select! {
        _guard = guarded_operation(Arc::clone(&semaphore)) => {
            // guard is held in this branch while work happens
            do_work().await;
            // guard dropped at end of branch - permit returned
        }
        _ = sleep(Duration::from_secs(5)) => {
            // guarded_operation future was dropped here
            // If the permit was already acquired, it is returned via Drop
            // If still waiting for permit, acquisition is cancelled safely
            println!("timed out");
        }
    }
}

Timeout Patterns

use tokio::time::timeout;

// Simple timeout wrapper
async fn with_timeout() -> Result {
    match timeout(Duration::from_secs(5), fetch_data()).await {
        Ok(result) => Ok(result),
        Err(_elapsed) => Err("operation timed out"),
    }
}

// Timeout with retry
async fn fetch_with_retry(max_attempts: u32) -> anyhow::Result {
    let mut last_err = anyhow::anyhow!("no attempts made");

    for attempt in 1..=max_attempts {
        match timeout(Duration::from_secs(10), fetch_data()).await {
            Ok(result) => return Ok(result),
            Err(_) => {
                last_err = anyhow::anyhow!("attempt {} timed out", attempt);
                tracing::warn!("Attempt {} timed out, retrying", attempt);
                sleep(Duration::from_millis(100 * attempt as u64)).await;
            }
        }
    }

    Err(last_err)
}

What Comes Next

Part 8 covers the common pitfalls that cause async Rust services to degrade under load: blocking in async context, holding locks across await points, and CPU-bound work that starves the scheduler. Understanding these is the difference between a service that performs well at 100 RPS and one that holds up at 10,000.

References

Written by:

650 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site