How to Batch Process Emails with Webhooks

Handle hundreds of inbound emails per minute with a queued webhook architecture. Learn how to buffer deliveries, process in parallel workers, and bulk-write to your database efficiently.

Table of Contents
  1. Overview
  2. Prerequisites
  3. Step-by-Step Instructions
  4. Code Example
  5. Common Pitfalls

Overview

At high email volumes, processing each webhook delivery synchronously in the HTTP handler becomes a bottleneck. A queued batch processing architecture decouples receipt (accepting the webhook) from processing (extracting data, writing to the database, calling downstream APIs), enabling much higher throughput.

The architecture looks like this:

  1. Webhook handler (thin) — verifies signature, pushes raw payload to queue, returns 200 immediately
  2. Queue (Redis + BullMQ, SQS, RabbitMQ) — buffers incoming deliveries
  3. Worker pool (N parallel workers) — processes jobs from the queue concurrently
  4. Batch writer (optional) — accumulates extracted records and bulk-inserts to the database periodically

This separation means your webhook handler always responds in under 100ms regardless of processing load, and your worker pool scales independently of your web server.

Prerequisites

Batch processing prerequisites:

  • A message queue: Redis + BullMQ (Node.js), Celery + Redis (Python), or a managed queue like SQS or Cloud Tasks
  • A worker process separate from your web server
  • A database that supports bulk inserts efficiently (PostgreSQL, MySQL, ClickHouse for analytics)
  • Metrics and monitoring for queue depth and worker throughput

Scale Your Email Processing Pipeline

JsonHook handles delivery at any volume. Your queue handles the rest.

Get Free API Key

Step-by-Step Instructions

Set up batch email processing:

  1. Install BullMQ and Redis:
    npm install bullmq ioredis
  2. Create a queue and add jobs in the webhook handler:
    const emailQueue = new Queue("email-processing", { connection: redis });
    
    app.post("/webhooks/email", async (req, res) => {
      // ... verify signature ...
      const payload = JSON.parse(req.body.toString());
      await emailQueue.add("process", payload, { attempts: 3 });
      res.sendStatus(200);
    });
  3. Implement workers that process queue jobs:
    const worker = new Worker("email-processing", async (job) => {
      const extracted = extractData(job.data.email);
      await db.insert("email_data", extracted);
    }, { connection: redis, concurrency: 10 });
  4. For bulk inserts, accumulate records in a buffer and flush every N records or every T seconds.
  5. Monitor queue depth — alert if depth exceeds a threshold indicating worker saturation.

Code Example

BullMQ worker with batch PostgreSQL insert:

import { Worker, Queue } from "bullmq";
import { Pool } from "pg";
import Redis from "ioredis";

const redis = new Redis(process.env.REDIS_URL!);
const db = new Pool({ connectionString: process.env.DATABASE_URL });

let batch: any[] = [];
const BATCH_SIZE = 50;
const FLUSH_INTERVAL_MS = 2000;

async function flushBatch() {
  if (!batch.length) return;
  const toFlush = batch.splice(0, batch.length);

  const values = toFlush.map((r, i) =>
    `($${i * 4 + 1}, $${i * 4 + 2}, $${i * 4 + 3}, $${i * 4 + 4})`
  ).join(", ");
  const params = toFlush.flatMap(r => [r.from, r.subject, r.body, r.deliveryId]);

  await db.query(
    `INSERT INTO emails (from_address, subject, body, delivery_id) VALUES ${values} ON CONFLICT (delivery_id) DO NOTHING`,
    params
  );
  console.log(`Flushed ${toFlush.length} emails to database`);
}

setInterval(flushBatch, FLUSH_INTERVAL_MS);

const worker = new Worker("email-processing", async (job) => {
  const { email, deliveryId } = job.data;
  batch.push({
    from:       email.from,
    subject:    email.subject,
    body:       (email.textBody ?? "").slice(0, 1000),
    deliveryId,
  });
  if (batch.length >= BATCH_SIZE) await flushBatch();
}, { connection: redis, concurrency: 20 });

Common Pitfalls

Batch processing pitfalls:

  • Losing jobs on process crash. In-memory batches are lost if the worker crashes before flushing. Use a queue's built-in persistence (BullMQ stores jobs in Redis) and only mark jobs as complete after the database write confirms success.
  • Queue growing unboundedly. If workers are slower than the arrival rate, the queue grows indefinitely. Monitor queue depth and add more worker processes when depth exceeds a threshold. Set a maximum queue size and drop or DLQ excess jobs.
  • No dead-letter queue (DLQ). Jobs that fail all retries disappear unless you configure a DLQ. Route failed jobs to a DLQ and alert so they can be investigated and replayed.
  • Processing order not guaranteed. Queue workers process jobs concurrently and may not maintain arrival order. If order matters (e.g., email thread replies must be processed after the parent), use ordered queues or add sorting logic in your batch writer.
  • Bulk insert conflicts. When retrying jobs, the same email may be inserted twice. Use ON CONFLICT DO NOTHING with a unique constraint on deliveryId to make bulk inserts idempotent.

Frequently Asked Questions

What queue should I use for batch email processing?

Redis + BullMQ is the most popular choice for Node.js due to its simplicity, reliability, and rich feature set (retries, delays, rate limiting, concurrency controls). Python developers typically use Celery + Redis or Celery + RabbitMQ. For managed, serverless queuing, AWS SQS or Google Cloud Tasks are excellent options that eliminate infrastructure management.

How do I scale worker concurrency without overwhelming my database?

Use a connection pool for your database client and size the pool to match your database server's capacity. Set worker concurrency (BullMQ's concurrency option) to match the pool size. Use a rate limiter on the queue to cap jobs processed per second if your downstream database or API has rate limits.

Is batch processing necessary for low-volume pipelines?

No. For under ~10 emails per minute, synchronous processing in the webhook handler is perfectly adequate. The queued batch architecture adds operational complexity and is worth adopting when: you need sub-100ms webhook handler response times, you are processing more than ~30 emails per minute, or your processing involves slow downstream calls.

Can I use serverless functions (Lambda, Cloud Functions) as my worker pool?

Yes. SQS + Lambda is a popular serverless batch processing pattern. JsonHook delivers to an API Gateway endpoint that writes to SQS, and Lambda functions process SQS messages in batches. This architecture scales automatically and has no idle cost but introduces cold start latency that must be acceptable for your use case.