The pitch for OTP is always the same: "let it crash," nine nines of uptime, Erlang running phone switches. That's all true and also completely useless when you're staring at a blank mix new project wondering how to actually structure the thing.
This tutorial skips the theory tour. We build something real: a distributed job processing system where a Node.js API enqueues work into Redis, and an Elixir/OTP application consumes it — with a supervision tree that keeps the whole thing running when individual workers die, when Redis blips, and when a job payload is malformed.
By the end you'll have:
- A Node.js producer API with Redis streams (not just lists — we want consumer groups)
- An Elixir
Applicationwith a proper OTP supervision tree - A
QueueConsumerGenServer that polls Redis and dispatches work - A
WorkerSupervisor(DynamicSupervisor) that spawns and monitors per-job workers - A
JobWorkerGenServer that processes a job, retries on failure, and dead-letters after max attempts - A
Telemetryintegration so you can see what's actually happening
No Oban, no Exq. We're building the layer below so you understand what those libraries are doing.
Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ Node.js Producer API │
│ POST /jobs → Redis XADD → Stream: "jobs:work" │
└─────────────────────────────────────────────────────────┘
│ Redis Streams
▼
┌─────────────────────────────────────────────────────────┐
│ Elixir OTP Application │
│ │
│ Application (supervisor) │
│ ├── RedisPool (Redix connections) │
│ ├── QueueConsumer (GenServer — polls + dispatches) │
│ └── WorkerSupervisor (DynamicSupervisor) │
│ ├── JobWorker<job_id_1> (GenServer) │
│ ├── JobWorker<job_id_2> (GenServer) │
│ └── JobWorker<job_id_n> (GenServer) │
└─────────────────────────────────────────────────────────┘
Redis Streams give us persistent, consumer-group-aware queuing. A job isn't acknowledged until the worker finishes it — crash the worker mid-job and Redis redelivers it on restart.
Part 1: The Node.js Producer
Project Setup
mkdir job-producer && cd job-producer
npm init -y
npm install express ioredis ulid zod
Redis Stream Producer
We use XADD to append jobs to a Redis stream. Unlike LPUSH/RPUSH, streams give us:
- Persistent, ordered log of all jobs (not consumed on read)
- Consumer groups (multiple consumers, each gets different jobs)
- Built-in pending entry list (PEL) — unacknowledged jobs are trackable
// src/redis.js
const Redis = require('ioredis');
const redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
maxRetriesPerRequest: 3,
retryStrategy: (times) => Math.min(times * 50, 2000),
lazyConnect: false,
});
redis.on('error', (err) => console.error('[redis] error:', err.message));
redis.on('connect', () => console.log('[redis] connected'));
module.exports = redis;
// src/jobs.js
const { ulid } = require('ulid');
const redis = require('./redis');
const { z } = require('zod');
const STREAM_KEY = 'jobs:work';
const MAX_LEN = 10_000; // cap stream length, trim old entries
// Job schema — validate before enqueuing
const JobSchema = z.object({
type: z.enum(['email', 'report', 'webhook', 'thumbnail']),
payload: z.record(z.unknown()),
priority: z.number().int().min(1).max(10).default(5),
});
async function enqueueJob(rawInput) {
const parsed = JobSchema.parse(rawInput); // throws ZodError if invalid
const job = {
id: ulid(), // sortable, unique job ID
type: parsed.type,
payload: JSON.stringify(parsed.payload),
priority: String(parsed.priority),
enqueued_at: new Date().toISOString(),
attempts: '0',
};
// XADD stream MAXLEN ~ 10000 * id field value field value ...
// '*' tells Redis to auto-generate the stream entry ID
const entryId = await redis.xadd(
STREAM_KEY,
'MAXLEN', '~', String(MAX_LEN),
'*', // auto-ID
...Object.entries(job).flat() // field-value pairs
);
console.log(`[jobs] enqueued ${job.type} job=${job.id} entry=${entryId}`);
return { jobId: job.id, streamEntryId: entryId };
}
async function getJobStats() {
const [length, groups] = await Promise.all([
redis.xlen(STREAM_KEY),
redis.xinfo('GROUPS', STREAM_KEY).catch(() => []),
]);
return { stream: STREAM_KEY, length, consumerGroups: groups };
}
module.exports = { enqueueJob, getJobStats };
The API
// src/index.js
const express = require('express');
const { enqueueJob, getJobStats } = require('./jobs');
const app = express();
app.use(express.json());
// POST /jobs — enqueue a new job
app.post('/jobs', async (req, res) => {
try {
const result = await enqueueJob(req.body);
res.status(202).json({ status: 'accepted', ...result });
} catch (err) {
if (err.name === 'ZodError') {
return res.status(422).json({ error: 'Invalid job', issues: err.issues });
}
console.error('[api] enqueue error:', err);
res.status(500).json({ error: 'Internal error' });
}
});
// GET /jobs/stats — queue depth and consumer group info
app.get('/jobs/stats', async (req, res) => {
const stats = await getJobStats();
res.json(stats);
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => console.log(`[api] listening on :${PORT}`));
Test it:
node src/index.js &
curl -s -X POST http://localhost:3000/jobs \
-H 'Content-Type: application/json' \
-d '{"type":"email","payload":{"to":"user@example.com","template":"welcome"}}' \
| jq .
# => { "status": "accepted", "jobId": "01HXYZ...", "streamEntryId": "1699...-0" }
Part 2: The Elixir/OTP Consumer
Project Setup
mix new job_consumer --sup # --sup scaffolds an Application module
cd job_consumer
The --sup flag is important — it generates a JobConsumer.Application module with a supervision tree stub. We'll fill that in.
# mix.exs
defp deps do
[
{:redix, "~> 1.4"},
{:poolboy, "~> 1.5"},
{:jason, "~> 1.4"},
{:telemetry, "~> 1.2"},
{:telemetry_metrics, "~> 0.6"},
]
end
mix deps.get
Configuration
# config/config.exs
import Config
config :job_consumer,
redis_host: System.get_env("REDIS_HOST", "localhost"),
redis_port: String.to_integer(System.get_env("REDIS_PORT", "6379")),
stream_key: System.get_env("STREAM_KEY", "jobs:work"),
consumer_group: System.get_env("CONSUMER_GROUP", "elixir-workers"),
consumer_name: System.get_env("CONSUMER_NAME", "consumer-#{:inet.gethostname() |> elem(1)}"),
max_concurrency: String.to_integer(System.get_env("MAX_CONCURRENCY", "10")),
poll_interval_ms: String.to_integer(System.get_env("POLL_INTERVAL_MS", "100")),
max_attempts: String.to_integer(System.get_env("MAX_ATTEMPTS", "3"))
The Supervision Tree
This is the heart of the OTP design. Get this right and everything else is pluggable.
# lib/job_consumer/application.ex
defmodule JobConsumer.Application do
use Application
require Logger
@impl true
def start(_type, _args) do
config = Application.get_all_env(:job_consumer)
children = [
# 1. Redis connection pool — must start before anything that uses Redis
{JobConsumer.RedisPool, config},
# 2. Queue consumer — polls Redis, dispatches to WorkerSupervisor
# Depends on RedisPool being up; supervision order matters
{JobConsumer.QueueConsumer, config},
# 3. Dynamic supervisor — spawns/monitors per-job worker processes
{JobConsumer.WorkerSupervisor, config},
# 4. Telemetry — attach handlers after workers are up
JobConsumer.Telemetry,
]
# :one_for_one — if one child crashes, only restart that child
# This is correct here: a crashed QueueConsumer shouldn't kill the WorkerSupervisor
# and vice versa. The RedisPool restart policy handles reconnection.
opts = [
strategy: :one_for_one,
name: JobConsumer.Supervisor,
max_restarts: 10,
max_seconds: 60,
]
Logger.info("[app] starting supervision tree")
Supervisor.start_link(children, opts)
end
end
A note on strategy choice: :one_for_one is right here because our children are loosely coupled — the QueueConsumer and WorkerSupervisor don't share state. If we had children where a crash in one makes the others' state invalid, we'd use :one_for_all (restart everyone) or :rest_for_one (restart the crashed child and all children started after it).
The Redis Connection Pool
We use Poolboy to maintain a pool of Redix connections. One connection handles one command at a time; pooling gives us concurrency.
# lib/job_consumer/redis_pool.ex
defmodule JobConsumer.RedisPool do
@pool_name :redix_pool
def child_spec(config) do
pool_opts = [
name: {:local, @pool_name},
worker_module: JobConsumer.RedisWorker,
size: 10, # idle connections
max_overflow: 5, # burst connections
]
redis_opts = [
host: config[:redis_host],
port: config[:redis_port],
]
:poolboy.child_spec(@pool_name, pool_opts, redis_opts)
end
# Execute a Redis command, borrowing a connection from the pool
def command(cmd) do
:poolboy.transaction(@pool_name, fn worker ->
Redix.command(worker, cmd)
end)
end
# Pipeline multiple commands in one round trip
def pipeline(cmds) do
:poolboy.transaction(@pool_name, fn worker ->
Redix.pipeline(worker, cmds)
end)
end
end
# lib/job_consumer/redis_worker.ex
defmodule JobConsumer.RedisWorker do
use GenServer
def start_link(redis_opts) do
GenServer.start_link(__MODULE__, redis_opts)
end
@impl true
def init(opts) do
host = Keyword.get(opts, :host, "localhost")
port = Keyword.get(opts, :port, 6379)
case Redix.start_link(host: host, port: port) do
{:ok, conn} -> {:ok, conn}
{:error, reason} -> {:stop, reason}
end
end
# Delegate all GenServer calls to the Redix connection
@impl true
def handle_call(request, from, conn) do
GenServer.reply(from, Redix.command(conn, request))
{:noreply, conn}
end
end
Ensuring the Consumer Group Exists
Redis consumer groups must be created before XREADGROUP can be called. We do this lazily in the QueueConsumer init:
# lib/job_consumer/stream.ex
defmodule JobConsumer.Stream do
require Logger
@doc """
Ensure the consumer group exists on the stream.
XGROUP CREATE with MKSTREAM creates the stream if it doesn't exist yet.
'$' means 'start from new messages only' (use '0' to reprocess all).
"""
def ensure_consumer_group!(stream_key, group_name) do
case JobConsumer.RedisPool.command(
["XGROUP", "CREATE", stream_key, group_name, "$", "MKSTREAM"]
) do
{:ok, "OK"} ->
Logger.info("[stream] created consumer group '#{group_name}' on '#{stream_key}'")
{:error, %Redix.Error{message: "BUSYGROUP" <> _}} ->
# Group already exists — this is fine, not an error
:ok
{:error, reason} ->
raise "Failed to create consumer group: #{inspect(reason)}"
end
end
@doc """
Read up to `count` new messages from the stream via consumer group.
'>' means 'give me messages not yet delivered to any consumer'.
"""
def read_new(stream_key, group_name, consumer_name, count \\ 10) do
JobConsumer.RedisPool.command([
"XREADGROUP",
"GROUP", group_name,
consumer_name,
"COUNT", Integer.to_string(count),
"BLOCK", "0", # block until messages available (ms); 0 = indefinite
"STREAMS", stream_key,
">" # deliver only undelivered messages
])
end
@doc """
Re-claim messages that have been pending (delivered but not acknowledged)
for longer than `min_idle_ms`. Used for crash recovery.
"""
def reclaim_stale(stream_key, group_name, consumer_name, min_idle_ms \\ 30_000) do
JobConsumer.RedisPool.command([
"XAUTOCLAIM",
stream_key,
group_name,
consumer_name,
Integer.to_string(min_idle_ms),
"0-0", # start from beginning of PEL
"COUNT", "100"
])
end
@doc """
Acknowledge a message — removes it from the Pending Entry List.
Call this only after successful processing.
"""
def ack(stream_key, group_name, entry_id) do
JobConsumer.RedisPool.command(["XACK", stream_key, group_name, entry_id])
end
end
The QueueConsumer GenServer
This is the poller. It wakes up, reads a batch of jobs from Redis, spawns a JobWorker for each via the WorkerSupervisor, and loops.
# lib/job_consumer/queue_consumer.ex
defmodule JobConsumer.QueueConsumer do
use GenServer
require Logger
alias JobConsumer.{Stream, WorkerSupervisor}
@reclaim_interval_ms 30_000 # check for stale pending entries every 30s
# ── Public API ─────────────────────────────────────────────────────────────
def start_link(config) do
GenServer.start_link(__MODULE__, config, name: __MODULE__)
end
def status do
GenServer.call(__MODULE__, :status)
end
# ── GenServer Callbacks ────────────────────────────────────────────────────
@impl true
def init(config) do
stream_key = config[:stream_key]
group_name = config[:consumer_group]
consumer_name = config[:consumer_name]
poll_interval = config[:poll_interval_ms]
max_concurrent = config[:max_concurrency]
# Ensure the consumer group exists before we start polling
Stream.ensure_consumer_group!(stream_key, group_name)
state = %{
stream_key: stream_key,
group_name: group_name,
consumer_name: consumer_name,
poll_interval: poll_interval,
max_concurrent: max_concurrent,
dispatched: 0,
errors: 0,
}
# Schedule first poll immediately, then reclaim loop
send(self(), :poll)
Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)
Logger.info("[consumer] started — group=#{group_name} consumer=#{consumer_name}")
{:ok, state}
end
@impl true
def handle_info(:poll, state) do
# Backpressure: don't read more jobs than we can handle concurrently
active_workers = WorkerSupervisor.active_count()
new_state =
if active_workers >= state.max_concurrent do
Logger.debug("[consumer] at capacity (#{active_workers}/#{state.max_concurrent}), skipping poll")
state
else
read_and_dispatch(state)
end
# Schedule next poll
Process.send_after(self(), :poll, state.poll_interval)
{:noreply, new_state}
end
@impl true
def handle_info(:reclaim_stale, state) do
case Stream.reclaim_stale(state.stream_key, state.group_name, state.consumer_name) do
{:ok, [_next_id, entries, _]} when entries != [] ->
Logger.warning("[consumer] reclaimed #{length(entries)} stale entries")
dispatch_entries(entries, state)
{:ok, _} ->
:ok
{:error, reason} ->
Logger.error("[consumer] reclaim failed: #{inspect(reason)}")
end
Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)
{:noreply, state}
end
@impl true
def handle_call(:status, _from, state) do
{:reply, Map.take(state, [:dispatched, :errors, :max_concurrent]), state}
end
# ── Private ────────────────────────────────────────────────────────────────
defp read_and_dispatch(state) do
case Stream.read_new(
state.stream_key,
state.group_name,
state.consumer_name,
state.max_concurrent
) do
{:ok, [[_stream_key, entries]]} ->
dispatch_entries(entries, state)
{:ok, nil} ->
# Timeout with no messages — normal
state
{:error, reason} ->
Logger.error("[consumer] read error: #{inspect(reason)}")
%{state | errors: state.errors + 1}
end
end
defp dispatch_entries(entries, state) do
Enum.reduce(entries, state, fn {entry_id, fields}, acc ->
job = parse_job(entry_id, fields)
case WorkerSupervisor.start_worker(job) do
{:ok, _pid} ->
Logger.debug("[consumer] dispatched job=#{job.id} entry=#{entry_id}")
%{acc | dispatched: acc.dispatched + 1}
{:error, reason} ->
Logger.error("[consumer] dispatch failed job=#{job.id}: #{inspect(reason)}")
%{acc | errors: acc.errors + 1}
end
end)
end
defp parse_job(entry_id, fields) do
field_map = Enum.chunk_every(fields, 2)
|> Enum.into(%{}, fn [k, v] -> {k, v} end)
%{
stream_entry_id: entry_id,
id: field_map["id"],
type: field_map["type"],
payload: Jason.decode!(field_map["payload"]),
priority: String.to_integer(field_map["priority"] || "5"),
attempts: String.to_integer(field_map["attempts"] || "0"),
enqueued_at: field_map["enqueued_at"],
}
end
end
The backpressure check (active_workers >= state.max_concurrent) is critical. Without it, a burst of 10,000 jobs would spawn 10,000 GenServer processes simultaneously. With it, we cap concurrency and let Redis hold the overflow.
The WorkerSupervisor
A DynamicSupervisor that spawns JobWorker processes on demand and supervises them independently.
# lib/job_consumer/worker_supervisor.ex
defmodule JobConsumer.WorkerSupervisor do
use DynamicSupervisor
require Logger
def start_link(config) do
DynamicSupervisor.start_link(__MODULE__, config, name: __MODULE__)
end
@impl true
def init(_config) do
# :one_for_one is the only strategy DynamicSupervisor supports
# max_restarts/max_seconds: if a worker crashes more than 3 times in 5s,
# the supervisor itself crashes and gets restarted by Application supervisor
DynamicSupervisor.init(
strategy: :one_for_one,
max_restarts: 3,
max_seconds: 5
)
end
@doc "Spawn a supervised JobWorker for the given job map"
def start_worker(job) do
spec = {JobConsumer.JobWorker, job}
DynamicSupervisor.start_child(__MODULE__, spec)
end
@doc "Count currently active (living) worker processes"
def active_count do
DynamicSupervisor.count_children(__MODULE__).active
end
@doc "List all active worker PIDs"
def list_workers do
DynamicSupervisor.which_children(__MODULE__)
|> Enum.map(fn {_id, pid, _type, _modules} -> pid end)
end
end
The JobWorker GenServer
This is where the actual work happens. Each job gets its own process — isolated heap, isolated failure domain, independent retry logic.
# lib/job_consumer/job_worker.ex
defmodule JobConsumer.JobWorker do
use GenServer, restart: :temporary # don't auto-restart crashed workers
require Logger
alias JobConsumer.{Stream, DeadLetter}
@base_retry_delay_ms 1_000
@max_attempts Application.compile_env(:job_consumer, :max_attempts, 3)
# ── Public API ─────────────────────────────────────────────────────────────
def start_link(job) do
GenServer.start_link(__MODULE__, job)
end
# ── GenServer Callbacks ────────────────────────────────────────────────────
@impl true
def init(job) do
# Process the job immediately after init — don't block the supervisor
send(self(), :process)
{:ok, job}
end
@impl true
def handle_info(:process, job) do
start_time = System.monotonic_time()
:telemetry.execute(
[:job_consumer, :job, :start],
%{system_time: System.system_time()},
%{job_type: job.type, job_id: job.id}
)
result =
try do
{:ok, execute_job(job)}
rescue
e -> {:error, Exception.format(:error, e, __STACKTRACE__)}
catch
:exit, reason -> {:error, "exit: #{inspect(reason)}"}
end
duration = System.monotonic_time() - start_time
case result do
{:ok, _output} ->
handle_success(job, duration)
{:error, reason} ->
handle_failure(job, reason, duration)
end
# Worker is done — stop normally. The supervisor does not restart :temporary workers.
{:stop, :normal, job}
end
# ── Job Dispatch ───────────────────────────────────────────────────────────
defp execute_job(%{type: "email"} = job) do
# Simulate: in production, call your mailer here
%{to: to, template: template} = atomize(job.payload)
Logger.info("[worker] sending email to=#{to} template=#{template}")
Process.sleep(100) # simulate I/O
%{sent_to: to, template: template}
end
defp execute_job(%{type: "report"} = job) do
%{report_id: id} = atomize(job.payload)
Logger.info("[worker] generating report id=#{id}")
Process.sleep(500)
%{report_id: id, rows: :rand.uniform(10_000)}
end
defp execute_job(%{type: "webhook"} = job) do
%{url: url} = atomize(job.payload)
Logger.info("[worker] dispatching webhook to=#{url}")
# In production: HTTP call here; raise on non-2xx for retry
Process.sleep(200)
%{url: url, status: 200}
end
defp execute_job(%{type: type}) do
raise "Unknown job type: #{type}"
end
# ── Success / Failure Handling ─────────────────────────────────────────────
defp handle_success(job, duration_native) do
duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)
# Acknowledge the message — removes it from Redis PEL
case Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id) do
{:ok, 1} ->
Logger.info("[worker] ✓ job=#{job.id} type=#{job.type} duration=#{duration_ms}ms")
{:ok, 0} ->
Logger.warning("[worker] ack returned 0 for job=#{job.id} — already acked?")
{:error, reason} ->
Logger.error("[worker] ack failed job=#{job.id}: #{inspect(reason)}")
end
:telemetry.execute(
[:job_consumer, :job, :success],
%{duration: duration_native},
%{job_type: job.type, job_id: job.id}
)
end
defp handle_failure(job, reason, duration_native) do
attempts = job.attempts + 1
duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)
Logger.error("[worker] ✗ job=#{job.id} type=#{job.type} attempt=#{attempts} reason=#{inspect(reason)}")
:telemetry.execute(
[:job_consumer, :job, :failure],
%{duration: duration_native},
%{job_type: job.type, job_id: job.id, attempt: attempts, reason: reason}
)
if attempts >= @max_attempts do
# Max attempts reached — move to dead letter stream, then ack to clear PEL
Logger.error("[worker] dead-lettering job=#{job.id} after #{attempts} attempts")
DeadLetter.push(job, reason)
Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)
else
# Exponential backoff retry: re-enqueue with incremented attempts
# We ack the current entry and re-add to the stream with updated attempts count
delay_ms = @base_retry_delay_ms * :math.pow(2, attempts) |> round()
Logger.info("[worker] retrying job=#{job.id} in #{delay_ms}ms (attempt #{attempts}/#{@max_attempts})")
Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)
# Re-enqueue after delay — spawn a detached process so we don't block
job_to_retry = %{job | attempts: attempts}
Task.start(fn ->
Process.sleep(delay_ms)
re_enqueue(job_to_retry)
end)
end
end
defp re_enqueue(job) do
fields = [
"id", job.id,
"type", job.type,
"payload", Jason.encode!(job.payload),
"priority", Integer.to_string(job.priority),
"enqueued_at", job.enqueued_at,
"attempts", Integer.to_string(job.attempts),
]
JobConsumer.RedisPool.command(
["XADD", config(:stream_key), "MAXLEN", "~", "10000", "*" | fields]
)
end
defp atomize(map) do
Map.new(map, fn {k, v} -> {String.to_existing_atom(k), v} end)
end
defp config(key), do: Application.fetch_env!(:job_consumer, key)
end
The restart: :temporary option on use GenServer is essential. It tells the WorkerSupervisor not to automatically restart a worker that exits — whether normally or abnormally. We want full control over retry logic inside the worker itself. Auto-restart would bypass our backoff and dead-letter logic.
Dead Letter Queue
Jobs that exhaust retries go here for inspection, not into the void:
# lib/job_consumer/dead_letter.ex
defmodule JobConsumer.DeadLetter do
require Logger
@stream_key "jobs:dead"
def push(job, reason) do
fields = [
"original_id", job.id,
"type", job.type,
"payload", Jason.encode!(job.payload),
"attempts", Integer.to_string(job.attempts),
"failed_at", DateTime.utc_now() |> DateTime.to_iso8601(),
"reason", inspect(reason),
]
case JobConsumer.RedisPool.command(["XADD", @stream_key, "*" | fields]) do
{:ok, entry_id} ->
Logger.info("[dead_letter] stored job=#{job.id} at entry=#{entry_id}")
{:ok, entry_id}
{:error, reason} ->
Logger.error("[dead_letter] failed to store job=#{job.id}: #{inspect(reason)}")
{:error, reason}
end
end
def list(count \\ 100) do
case JobConsumer.RedisPool.command(["XRANGE", @stream_key, "-", "+", "COUNT", Integer.to_string(count)]) do
{:ok, entries} -> {:ok, Enum.map(entries, &parse_entry/1)}
error -> error
end
end
defp parse_entry({entry_id, fields}) do
field_map = Enum.chunk_every(fields, 2)
|> Enum.into(%{}, fn [k, v] -> {k, v} end)
Map.put(field_map, "entry_id", entry_id)
end
end
Telemetry
Wire up metrics so you actually know what's happening:
# lib/job_consumer/telemetry.ex
defmodule JobConsumer.Telemetry do
use GenServer
require Logger
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_) do
events = [
[:job_consumer, :job, :start],
[:job_consumer, :job, :success],
[:job_consumer, :job, :failure],
]
:telemetry.attach_many(
"job-consumer-logger",
events,
&__MODULE__.handle_event/4,
nil
)
{:ok, %{processed: 0, failed: 0}}
end
def handle_event([:job_consumer, :job, :start], _measurements, meta, _config) do
Logger.debug("[telemetry] job started type=#{meta.job_type} id=#{meta.job_id}")
end
def handle_event([:job_consumer, :job, :success], measurements, meta, _config) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
Logger.info("[telemetry] job success type=#{meta.job_type} id=#{meta.job_id} duration=#{duration_ms}ms")
# In production: emit to StatsD, Prometheus, Datadog, etc.
end
def handle_event([:job_consumer, :job, :failure], _measurements, meta, _config) do
Logger.warning("[telemetry] job failure type=#{meta.job_type} id=#{meta.job_id} attempt=#{meta.attempt}")
end
end
Watching the Supervision Tree in Action
Start the Elixir application:
mix run --no-halt
In another terminal, enqueue a batch of jobs:
for i in $(seq 1 20); do
curl -s -X POST http://localhost:3000/jobs \
-H 'Content-Type: application/json' \
-d "{\"type\":\"email\",\"payload\":{\"to\":\"user${i}@example.com\",\"template\":\"welcome\"}}" \
> /dev/null
done
echo "Enqueued 20 jobs"
Observe the Elixir logs — you'll see workers spawning, processing, and acknowledging:
[consumer] dispatched job=01HX... entry=1699...-0
[worker] sending email to=user1@example.com template=welcome
[worker] ✓ job=01HX... type=email duration=103ms
[consumer] dispatched job=01HY... entry=1699...-1
...
Now simulate a crash. In iex:
# Kill the QueueConsumer process directly
Process.whereis(JobConsumer.QueueConsumer) |> Process.exit(:kill)
# The Application supervisor restarts it automatically within milliseconds
# Watch the logs:
# [consumer] started — group=elixir-workers consumer=consumer-hostname
The supervision tree just restarted the consumer. Any jobs that were mid-flight but not yet acknowledged are still in the Redis PEL — the reclaim_stale loop will pick them up on the next cycle.
The Failure Matrix
| Failure | What happens | Recovery |
|---|---|---|
| JobWorker crashes mid-job | Job stays in Redis PEL (not acked) |
XAUTOCLAIM reclaims after 30s |
| JobWorker raises exception |
try/rescue catches it, retry logic runs |
Exponential backoff, then dead letter |
| QueueConsumer crashes | App supervisor restarts it | Polls resume; PEL intact in Redis |
| WorkerSupervisor crashes | App supervisor restarts it | All workers lost; PEL covers in-flight jobs |
| Redis connection drops | Redix auto-reconnects; pool returns errors | Consumer logs errors, retries next poll |
| Job exceeds max_attempts | Moved to jobs:dead stream, PEL cleared |
Manual inspection + replay |
| Burst of jobs | Backpressure check caps concurrency | Overflow sits in Redis stream safely |
Every cell in that table has code behind it in what we built. None of it relies on hope.
Where to Take It Next
-
Priorities: Add a separate stream per priority level (
jobs:high,jobs:normal,jobs:low). Poll high-priority first; fall through to lower streams only when high is empty. -
Observability: Replace the Telemetry logger with a Prometheus exporter. Track queue depth (Redis
XLEN), processing rate, p99 duration per job type. -
Horizontal scaling: Run multiple Elixir nodes. Each gets a unique
consumer_name. Redis consumer groups handle deduplication automatically — no coordinator needed. -
Rate limiting: Add a
RateLimiterGenServer that tracks jobs-per-second per job type and blocks theQueueConsumerdispatch when limits are hit. -
Job cancellation:
XDELa stream entry by ID before it's claimed. Workers should check a cancellation flag at the start ofexecute_job.
The OTP supervision tree you have now is the skeleton that all of this hangs on. Add a new capability → add a supervised child. Something breaks → the tree heals it. That's the promise, and it's not magic — it's just processes all the way down.
Top comments (0)