Message Passing and Channels: Concurrency Without Shared Mutable State
This article covers Message Passing and Channels: Concurrency Without Shared Mutable State. Learn message-passing concurrency: bounded vs unbounded channels, backpressure, fan-in/fan-out, and shutdown semantics. Includes C, Zig, and Rust...
Shared mutable state + threads is where bugs breed.
Message passing is an alternative model:
- producers send messages
- consumers receive messages
- you transfer ownership of data through the channel
It doesn’t magically remove complexity, but it often localizes it.
1) Channel design space
A channel typically has:
- a buffer (0-sized “rendezvous”, bounded, or unbounded)
- synchronization around enqueue/dequeue
- shutdown semantics (close, disconnect, drop)
Key decisions:
- bounded vs unbounded
- blocking vs non-blocking
- single vs multi-producer/consumer
2) Bounded channels = backpressure
Bounded channels are the default choice for systems code.
If producers outrun consumers:
- bounded channel forces producers to block / shed load
- unbounded channel grows memory usage until you crash
3) C: a simple bounded channel
This is essentially a queue + mutex + condvars.
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#define CAP 1024
typedef struct {
uint64_t buf[CAP];
size_t head, tail, len;
int closed;
pthread_mutex_t mu;
pthread_cond_t not_empty;
pthread_cond_t not_full;
} chan_t;
static void chan_init(chan_t *c) {
c->head = c->tail = c->len = 0;
c->closed = 0;
pthread_mutex_init(&c->mu, NULL);
pthread_cond_init(&c->not_empty, NULL);
pthread_cond_init(&c->not_full, NULL);
}
static void chan_close(chan_t *c) {
pthread_mutex_lock(&c->mu);
c->closed = 1;
pthread_cond_broadcast(&c->not_empty);
pthread_cond_broadcast(&c->not_full);
pthread_mutex_unlock(&c->mu);
}
// Returns 1 on success, 0 if closed.
static int chan_send(chan_t *c, uint64_t v) {
pthread_mutex_lock(&c->mu);
while (!c->closed && c->len == CAP) {
pthread_cond_wait(&c->not_full, &c->mu);
}
if (c->closed) {
pthread_mutex_unlock(&c->mu);
return 0;
}
c->buf[c->tail] = v;
c->tail = (c->tail + 1) % CAP;
c->len++;
pthread_cond_signal(&c->not_empty);
pthread_mutex_unlock(&c->mu);
return 1;
}
// Returns 1 if received, 0 if closed and empty.
static int chan_recv(chan_t *c, uint64_t *out) {
pthread_mutex_lock(&c->mu);
while (!c->closed && c->len == 0) {
pthread_cond_wait(&c->not_empty, &c->mu);
}
if (c->len == 0 && c->closed) {
pthread_mutex_unlock(&c->mu);
return 0;
}
*out = c->buf[c->head];
c->head = (c->head + 1) % CAP;
c->len--;
pthread_cond_signal(&c->not_full);
pthread_mutex_unlock(&c->mu);
return 1;
}
4) Zig: bounded queue + condition variables
const std = @import("std");
pub const Channel = struct {
const CAP: usize = 1024;
buf: [CAP]u64 = undefined,
head: usize = 0,
tail: usize = 0,
len: usize = 0,
closed: bool = false,
mu: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
pub fn close(self: *Channel) void {
self.mu.lock();
self.closed = true;
self.not_empty.broadcast();
self.not_full.broadcast();
self.mu.unlock();
}
pub fn send(self: *Channel, v: u64) bool {
self.mu.lock();
defer self.mu.unlock();
while (!self.closed and self.len == CAP) {
self.not_full.wait(&self.mu);
}
if (self.closed) return false;
self.buf[self.tail] = v;
self.tail = (self.tail + 1) % CAP;
self.len += 1;
self.not_empty.signal();
return true;
}
pub fn recv(self: *Channel) ?u64 {
self.mu.lock();
defer self.mu.unlock();
while (!self.closed and self.len == 0) {
self.not_empty.wait(&self.mu);
}
if (self.len == 0 and self.closed) return null;
const v = self.buf[self.head];
self.head = (self.head + 1) % CAP;
self.len -= 1;
self.not_full.signal();
return v;
}
};
5) Rust: bounded channels in std
Rust’s sync_channel is a bounded MPSC channel.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::sync_channel::<u64>(1024);
let prod = thread::spawn({
let tx = tx.clone();
move || {
for i in 0..1_000_000u64 {
tx.send(i).unwrap();
}
}
});
let cons = thread::spawn(move || {
let mut sum = 0u64;
while let Ok(v) = rx.recv() {
sum = sum.wrapping_add(v);
}
sum
});
prod.join().unwrap();
drop(tx); // close
let sum = cons.join().unwrap();
println!("sum={sum}");
}
6) Patterns you’ll actually use
- Fan-out: one producer sends tasks to multiple workers.
- Fan-in: multiple workers send results to one aggregator.
- Work stealing: more complex, usually needs deques.
7) Shutdown and cancellation
Decide explicitly:
- who closes the channel
- how receivers stop (close + drain, or immediate)
- how producers react when send fails
References
- POSIX condition variables: https://man7.org/linux/man-pages/man3/pthread_cond_wait.3p.html
- Rust channels: https://doc.rust-lang.org/std/sync/mpsc/
- Go’s channel patterns (conceptual inspiration): https://go.dev/doc/effective_go#channels
- Little’s Law: https://en.wikipedia.org/wiki/Little%27s_law