Thread Pools and Backpressure: Scaling Work Without Melting Down

This article covers Thread Pools and Backpressure: Scaling Work Without Melting Down. Thread pools are easy to create and hard to run safely. Learn bounded queues, backpressure, shutdown, and avoiding unbounded memory growth, with C, Zig...

A thread pool typically looks like:

  • N worker threads
  • a queue of tasks
  • producers push tasks into the queue
  • workers pop tasks and execute

The dangerous part: if producers can enqueue tasks faster than workers can process them, your program may:

  • allocate unbounded memory
  • increase latency dramatically
  • thrash caches
  • crash under load

This is why backpressure matters.

1) Backpressure in one sentence

Backpressure means: when the system is saturated, slow down the producers.

In practice:

  • bound the queue
  • block, drop, or shed load when full

2) C: a bounded work queue with pthreads

Below is a minimal bounded queue that blocks producers when full.

#include <pthread.h>
#include <stddef.h>

#define QCAP 1024

typedef void (*task_fn)(void *);

typedef struct {
    task_fn fn;
    void *arg;
} task_t;

typedef struct {
    task_t buf[QCAP];
    size_t head, tail, len;
    int shutdown;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} queue_t;

static void q_init(queue_t *q) {
    q->head = q->tail = q->len = 0;
    q->shutdown = 0;
    pthread_mutex_init(&q->mu, NULL);
    pthread_cond_init(&q->not_empty, NULL);
    pthread_cond_init(&q->not_full, NULL);
}

static int q_push(queue_t *q, task_t t) {
    pthread_mutex_lock(&q->mu);
    while (!q->shutdown && q->len == QCAP) {
        pthread_cond_wait(&q->not_full, &q->mu);
    }
    if (q->shutdown) {
        pthread_mutex_unlock(&q->mu);
        return 0;
    }
    q->buf[q->tail] = t;
    q->tail = (q->tail + 1) % QCAP;
    q->len++;
    pthread_cond_signal(&q->not_empty);
    pthread_mutex_unlock(&q->mu);
    return 1;
}

static int q_pop(queue_t *q, task_t *out) {
    pthread_mutex_lock(&q->mu);
    while (!q->shutdown && q->len == 0) {
        pthread_cond_wait(&q->not_empty, &q->mu);
    }
    if (q->len == 0 && q->shutdown) {
        pthread_mutex_unlock(&q->mu);
        return 0;
    }
    *out = q->buf[q->head];
    q->head = (q->head + 1) % QCAP;
    q->len--;
    pthread_cond_signal(&q->not_full);
    pthread_mutex_unlock(&q->mu);
    return 1;
}

static void q_stop(queue_t *q) {
    pthread_mutex_lock(&q->mu);
    q->shutdown = 1;
    pthread_cond_broadcast(&q->not_empty);
    pthread_cond_broadcast(&q->not_full);
    pthread_mutex_unlock(&q->mu);
}

Workers call q_pop, producers call q_push. When the queue is full, producers block: this is backpressure.

3) Zig: a bounded channel-like queue

const std = @import("std");

const Task = struct {
    fn_ptr: *const fn (?*anyopaque) void,
    arg: ?*anyopaque,
};

const Queue = struct {
    const CAP: usize = 1024;

    buf: [CAP]Task = undefined,
    head: usize = 0,
    tail: usize = 0,
    len: usize = 0,
    shutdown: bool = false,

    mu: std.Thread.Mutex = .{},
    not_empty: std.Thread.Condition = .{},
    not_full: std.Thread.Condition = .{},

    fn push(self: *Queue, t: Task) bool {
        self.mu.lock();
        defer self.mu.unlock();
        while (!self.shutdown and self.len == CAP) {
            self.not_full.wait(&self.mu);
        }
        if (self.shutdown) return false;
        self.buf[self.tail] = t;
        self.tail = (self.tail + 1) % CAP;
        self.len += 1;
        self.not_empty.signal();
        return true;
    }

    fn pop(self: *Queue) ?Task {
        self.mu.lock();
        defer self.mu.unlock();
        while (!self.shutdown and self.len == 0) {
            self.not_empty.wait(&self.mu);
        }
        if (self.len == 0 and self.shutdown) return null;
        const t = self.buf[self.head];
        self.head = (self.head + 1) % CAP;
        self.len -= 1;
        self.not_full.signal();
        return t;
    }

    fn stop(self: *Queue) void {
        self.mu.lock();
        self.shutdown = true;
        self.not_empty.broadcast();
        self.not_full.broadcast();
        self.mu.unlock();
    }
};

4) Rust: bounded channel as a thread pool queue

Rust already gives you strong building blocks.

use std::sync::{mpsc, Arc};
use std::thread;

fn main() {
    let (tx, rx) = mpsc::sync_channel::<Box<dyn FnOnce() + Send>>(1024);
    let rx = Arc::new(std::sync::Mutex::new(rx));

    for _ in 0..4 {
        let rx = rx.clone();
        thread::spawn(move || loop {
            let job = {
                let guard = rx.lock().unwrap();
                guard.recv()
            };
            match job {
                Ok(f) => f(),
                Err(_) => break,
            }
        });
    }

    // Producers: blocks if queue is full.
    for i in 0..1_000_000u64 {
        let t = Box::new(move || {
            let _ = i.wrapping_mul(2);
        });
        tx.send(t).unwrap();
    }
}

The key: sync_channel is bounded and enforces backpressure.

5) Shutdown semantics

Decide what you want:

  • drain queued tasks then exit
  • cancel queued tasks
  • immediate shutdown

Make it explicit.

References