Message Passing and Channels: Concurrency Without Shared Mutable State

This article covers Message Passing and Channels: Concurrency Without Shared Mutable State. Learn message-passing concurrency: bounded vs unbounded channels, backpressure, fan-in/fan-out, and shutdown semantics. Includes C, Zig, and Rust...

Shared mutable state + threads is where bugs breed.

Message passing is an alternative model:

  • producers send messages
  • consumers receive messages
  • you transfer ownership of data through the channel

It doesn’t magically remove complexity, but it often localizes it.

1) Channel design space

A channel typically has:

  • a buffer (0-sized “rendezvous”, bounded, or unbounded)
  • synchronization around enqueue/dequeue
  • shutdown semantics (close, disconnect, drop)

Key decisions:

  • bounded vs unbounded
  • blocking vs non-blocking
  • single vs multi-producer/consumer

2) Bounded channels = backpressure

Bounded channels are the default choice for systems code.

If producers outrun consumers:

  • bounded channel forces producers to block / shed load
  • unbounded channel grows memory usage until you crash

3) C: a simple bounded channel

This is essentially a queue + mutex + condvars.

#include <pthread.h>
#include <stddef.h>
#include <stdint.h>

#define CAP 1024

typedef struct {
    uint64_t buf[CAP];
    size_t head, tail, len;
    int closed;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} chan_t;

static void chan_init(chan_t *c) {
    c->head = c->tail = c->len = 0;
    c->closed = 0;
    pthread_mutex_init(&c->mu, NULL);
    pthread_cond_init(&c->not_empty, NULL);
    pthread_cond_init(&c->not_full, NULL);
}

static void chan_close(chan_t *c) {
    pthread_mutex_lock(&c->mu);
    c->closed = 1;
    pthread_cond_broadcast(&c->not_empty);
    pthread_cond_broadcast(&c->not_full);
    pthread_mutex_unlock(&c->mu);
}

// Returns 1 on success, 0 if closed.
static int chan_send(chan_t *c, uint64_t v) {
    pthread_mutex_lock(&c->mu);
    while (!c->closed && c->len == CAP) {
        pthread_cond_wait(&c->not_full, &c->mu);
    }
    if (c->closed) {
        pthread_mutex_unlock(&c->mu);
        return 0;
    }
    c->buf[c->tail] = v;
    c->tail = (c->tail + 1) % CAP;
    c->len++;
    pthread_cond_signal(&c->not_empty);
    pthread_mutex_unlock(&c->mu);
    return 1;
}

// Returns 1 if received, 0 if closed and empty.
static int chan_recv(chan_t *c, uint64_t *out) {
    pthread_mutex_lock(&c->mu);
    while (!c->closed && c->len == 0) {
        pthread_cond_wait(&c->not_empty, &c->mu);
    }
    if (c->len == 0 && c->closed) {
        pthread_mutex_unlock(&c->mu);
        return 0;
    }
    *out = c->buf[c->head];
    c->head = (c->head + 1) % CAP;
    c->len--;
    pthread_cond_signal(&c->not_full);
    pthread_mutex_unlock(&c->mu);
    return 1;
}

4) Zig: bounded queue + condition variables

const std = @import("std");

pub const Channel = struct {
    const CAP: usize = 1024;

    buf: [CAP]u64 = undefined,
    head: usize = 0,
    tail: usize = 0,
    len: usize = 0,
    closed: bool = false,

    mu: std.Thread.Mutex = .{},
    not_empty: std.Thread.Condition = .{},
    not_full: std.Thread.Condition = .{},

    pub fn close(self: *Channel) void {
        self.mu.lock();
        self.closed = true;
        self.not_empty.broadcast();
        self.not_full.broadcast();
        self.mu.unlock();
    }

    pub fn send(self: *Channel, v: u64) bool {
        self.mu.lock();
        defer self.mu.unlock();

        while (!self.closed and self.len == CAP) {
            self.not_full.wait(&self.mu);
        }
        if (self.closed) return false;

        self.buf[self.tail] = v;
        self.tail = (self.tail + 1) % CAP;
        self.len += 1;
        self.not_empty.signal();
        return true;
    }

    pub fn recv(self: *Channel) ?u64 {
        self.mu.lock();
        defer self.mu.unlock();

        while (!self.closed and self.len == 0) {
            self.not_empty.wait(&self.mu);
        }
        if (self.len == 0 and self.closed) return null;

        const v = self.buf[self.head];
        self.head = (self.head + 1) % CAP;
        self.len -= 1;
        self.not_full.signal();
        return v;
    }
};

5) Rust: bounded channels in std

Rust’s sync_channel is a bounded MPSC channel.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::sync_channel::<u64>(1024);

    let prod = thread::spawn({
        let tx = tx.clone();
        move || {
            for i in 0..1_000_000u64 {
                tx.send(i).unwrap();
            }
        }
    });

    let cons = thread::spawn(move || {
        let mut sum = 0u64;
        while let Ok(v) = rx.recv() {
            sum = sum.wrapping_add(v);
        }
        sum
    });

    prod.join().unwrap();
    drop(tx); // close
    let sum = cons.join().unwrap();
    println!("sum={sum}");
}

6) Patterns you’ll actually use

  • Fan-out: one producer sends tasks to multiple workers.
  • Fan-in: multiple workers send results to one aggregator.
  • Work stealing: more complex, usually needs deques.

7) Shutdown and cancellation

Decide explicitly:

  • who closes the channel
  • how receivers stop (close + drain, or immediate)
  • how producers react when send fails

References