Concurrent File Processing Pipelines: Batching, Backpressure, and Work Partitioning

This article covers Concurrent File Processing Pipelines: Batching, Backpressure, and Work Partitioning. Build a practical pipeline to process many files concurrently without overwhelming the system. Learn batching, bounded queues, and...

Processing many files (log ingestion, indexing, backups, static site builds) looks like “just parallelize it”. In practice, naive parallelism causes:

  • too many open file descriptors
  • random I/O and cache thrash
  • unbounded memory growth
  • latency spikes

A good file pipeline uses:

  • bounded queues (backpressure)
  • controlled concurrency
  • batching and locality

1) A pipeline structure that scales

A common design:

  1. Discovery stage: walk directories and produce file paths
  2. Read stage: read file contents (or chunks)
  3. Parse stage: CPU work (parse/transform)
  4. Write stage: output (files, DB, network)

Each stage has a bounded queue.

2) Key resource limits

  • FD limit: ulimit -n
  • IO depth: disk/NVMe queue depth
  • CPU cores: parse stage typically scales with cores
  • Memory: buffer sizes and queue depth

3) C: bounded path queue + worker pool

Below is a simplified pipeline where:

  • producer pushes file paths into a bounded queue
  • N workers pop and process
#include <pthread.h>
#include <stdio.h>
#include <string.h>

#define CAP 1024
#define PATH_MAX_LOCAL 1024

typedef struct {
    char buf[CAP][PATH_MAX_LOCAL];
    size_t head, tail, len;
    int closed;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} pathq_t;

static void q_init(pathq_t *q) {
    q->head = q->tail = q->len = 0;
    q->closed = 0;
    pthread_mutex_init(&q->mu, NULL);
    pthread_cond_init(&q->not_empty, NULL);
    pthread_cond_init(&q->not_full, NULL);
}

static int q_push(pathq_t *q, const char *p) {
    pthread_mutex_lock(&q->mu);
    while (!q->closed && q->len == CAP) pthread_cond_wait(&q->not_full, &q->mu);
    if (q->closed) { pthread_mutex_unlock(&q->mu); return 0; }

    strncpy(q->buf[q->tail], p, PATH_MAX_LOCAL - 1);
    q->buf[q->tail][PATH_MAX_LOCAL - 1] = 0;
    q->tail = (q->tail + 1) % CAP;
    q->len++;

    pthread_cond_signal(&q->not_empty);
    pthread_mutex_unlock(&q->mu);
    return 1;
}

static int q_pop(pathq_t *q, char out[PATH_MAX_LOCAL]) {
    pthread_mutex_lock(&q->mu);
    while (!q->closed && q->len == 0) pthread_cond_wait(&q->not_empty, &q->mu);
    if (q->len == 0 && q->closed) { pthread_mutex_unlock(&q->mu); return 0; }

    strncpy(out, q->buf[q->head], PATH_MAX_LOCAL);
    q->head = (q->head + 1) % CAP;
    q->len--;

    pthread_cond_signal(&q->not_full);
    pthread_mutex_unlock(&q->mu);
    return 1;
}

static void q_close(pathq_t *q) {
    pthread_mutex_lock(&q->mu);
    q->closed = 1;
    pthread_cond_broadcast(&q->not_empty);
    pthread_cond_broadcast(&q->not_full);
    pthread_mutex_unlock(&q->mu);
}

static void process_file(const char *path) {
    // placeholder: do real I/O + parse here
    (void)path;
}

typedef struct { pathq_t *q; } worker_arg_t;

static void *worker(void *arg) {
    worker_arg_t *wa = (worker_arg_t*)arg;
    char path[PATH_MAX_LOCAL];
    while (q_pop(wa->q, path)) {
        process_file(path);
    }
    return NULL;
}

This is the backbone: the important part is the bounded queue.

4) Zig: pipeline with bounded queue

const std = @import("std");

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const a = gpa.allocator();

    var q = std.fifo.LinearFifo([]const u8, .{ .Static = 1024 }){};
    // For production you would likely store owned strings; this is conceptual.

    _ = a; // placeholder
    _ = q;
}

Zig note: a full robust pipeline needs owned buffers, careful lifetimes, and either mutexes/conds or atomics.

5) Rust: bounded channels + rayon-style partitioning

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::sync_channel::<String>(1024);

    // workers
    for _ in 0..4 {
        let rx = rx.clone();
        thread::spawn(move || {
            while let Ok(path) = rx.recv() {
                // read/parse/process
                let _ = path;
            }
        });
    }

    // producer
    for i in 0..10_000 {
        tx.send(format!("file-{}", i)).unwrap();
    }
}

6) Practical tips

  • Batch small files to reduce syscall overhead.
  • Limit the number of concurrently open files.
  • Use pread in multi-threaded readers if offsets matter.
  • Measure with perf stat and iostat.

References