Thread Pools and Backpressure: Scaling Work Without Melting Down
This article covers Thread Pools and Backpressure: Scaling Work Without Melting Down. Thread pools are easy to create and hard to run safely. Learn bounded queues, backpressure, shutdown, and avoiding unbounded memory growth, with C, Zig...
A thread pool typically looks like:
- N worker threads
- a queue of tasks
- producers push tasks into the queue
- workers pop tasks and execute
The dangerous part: if producers can enqueue tasks faster than workers can process them, your program may:
- allocate unbounded memory
- increase latency dramatically
- thrash caches
- crash under load
This is why backpressure matters.
1) Backpressure in one sentence
Backpressure means: when the system is saturated, slow down the producers.
In practice:
- bound the queue
- block, drop, or shed load when full
2) C: a bounded work queue with pthreads
Below is a minimal bounded queue that blocks producers when full.
#include <pthread.h>
#include <stddef.h>
#define QCAP 1024
typedef void (*task_fn)(void *);
typedef struct {
task_fn fn;
void *arg;
} task_t;
typedef struct {
task_t buf[QCAP];
size_t head, tail, len;
int shutdown;
pthread_mutex_t mu;
pthread_cond_t not_empty;
pthread_cond_t not_full;
} queue_t;
static void q_init(queue_t *q) {
q->head = q->tail = q->len = 0;
q->shutdown = 0;
pthread_mutex_init(&q->mu, NULL);
pthread_cond_init(&q->not_empty, NULL);
pthread_cond_init(&q->not_full, NULL);
}
static int q_push(queue_t *q, task_t t) {
pthread_mutex_lock(&q->mu);
while (!q->shutdown && q->len == QCAP) {
pthread_cond_wait(&q->not_full, &q->mu);
}
if (q->shutdown) {
pthread_mutex_unlock(&q->mu);
return 0;
}
q->buf[q->tail] = t;
q->tail = (q->tail + 1) % QCAP;
q->len++;
pthread_cond_signal(&q->not_empty);
pthread_mutex_unlock(&q->mu);
return 1;
}
static int q_pop(queue_t *q, task_t *out) {
pthread_mutex_lock(&q->mu);
while (!q->shutdown && q->len == 0) {
pthread_cond_wait(&q->not_empty, &q->mu);
}
if (q->len == 0 && q->shutdown) {
pthread_mutex_unlock(&q->mu);
return 0;
}
*out = q->buf[q->head];
q->head = (q->head + 1) % QCAP;
q->len--;
pthread_cond_signal(&q->not_full);
pthread_mutex_unlock(&q->mu);
return 1;
}
static void q_stop(queue_t *q) {
pthread_mutex_lock(&q->mu);
q->shutdown = 1;
pthread_cond_broadcast(&q->not_empty);
pthread_cond_broadcast(&q->not_full);
pthread_mutex_unlock(&q->mu);
}
Workers call q_pop, producers call q_push. When the queue is full, producers block: this is backpressure.
3) Zig: a bounded channel-like queue
const std = @import("std");
const Task = struct {
fn_ptr: *const fn (?*anyopaque) void,
arg: ?*anyopaque,
};
const Queue = struct {
const CAP: usize = 1024;
buf: [CAP]Task = undefined,
head: usize = 0,
tail: usize = 0,
len: usize = 0,
shutdown: bool = false,
mu: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
fn push(self: *Queue, t: Task) bool {
self.mu.lock();
defer self.mu.unlock();
while (!self.shutdown and self.len == CAP) {
self.not_full.wait(&self.mu);
}
if (self.shutdown) return false;
self.buf[self.tail] = t;
self.tail = (self.tail + 1) % CAP;
self.len += 1;
self.not_empty.signal();
return true;
}
fn pop(self: *Queue) ?Task {
self.mu.lock();
defer self.mu.unlock();
while (!self.shutdown and self.len == 0) {
self.not_empty.wait(&self.mu);
}
if (self.len == 0 and self.shutdown) return null;
const t = self.buf[self.head];
self.head = (self.head + 1) % CAP;
self.len -= 1;
self.not_full.signal();
return t;
}
fn stop(self: *Queue) void {
self.mu.lock();
self.shutdown = true;
self.not_empty.broadcast();
self.not_full.broadcast();
self.mu.unlock();
}
};
4) Rust: bounded channel as a thread pool queue
Rust already gives you strong building blocks.
use std::sync::{mpsc, Arc};
use std::thread;
fn main() {
let (tx, rx) = mpsc::sync_channel::<Box<dyn FnOnce() + Send>>(1024);
let rx = Arc::new(std::sync::Mutex::new(rx));
for _ in 0..4 {
let rx = rx.clone();
thread::spawn(move || loop {
let job = {
let guard = rx.lock().unwrap();
guard.recv()
};
match job {
Ok(f) => f(),
Err(_) => break,
}
});
}
// Producers: blocks if queue is full.
for i in 0..1_000_000u64 {
let t = Box::new(move || {
let _ = i.wrapping_mul(2);
});
tx.send(t).unwrap();
}
}
The key: sync_channel is bounded and enforces backpressure.
5) Shutdown semantics
Decide what you want:
- drain queued tasks then exit
- cancel queued tasks
- immediate shutdown
Make it explicit.
References
- POSIX threads: https://man7.org/linux/man-pages/man7/pthreads.7.html
- Rust
mpsc::sync_channel: https://doc.rust-lang.org/std/sync/mpsc/fn.sync_channel.html - Little’s Law (queueing intuition): https://en.wikipedia.org/wiki/Little%27s_law