DEV Community

丁久
丁久

Posted on • Originally published at dingjiu1989-hue.github.io

Kafka vs RabbitMQ vs Apache Pulsar

This article was originally published on AI Study Room. For the full version with working code examples and related articles, visit the original post.

Introduction

Message brokers are the nervous system of distributed architectures, enabling asynchronous communication between services at scale. Apache Kafka, RabbitMQ, and Apache Pulsar represent three distinct approaches to message processing. Choosing the wrong broker leads to architecture that fights against the tool's strengths. This article provides a technical comparison to guide your decision.

Architecture and Message Model

Apache Kafka

Kafka uses a distributed commit log model with partitioned topics:

# Kafka topic configuration
topic_config:
  name: orders
  partitions: 12
  replication_factor: 3
  configs:
    cleanup.policy: delete
    retention.ms: 604800000  # 7 days
    retention.bytes: 1073741824  # 1 GB
    compression.type: snappy
    min.insync.replicas: 2
    max.message.bytes: 1048576  # 1 MB

# Producer configuration (Go)
producer_config:
  acks: all           # Wait for all replicas
  retries: 3
  batch.size: 16384   # 16KB
  linger.ms: 5        # Wait up to 5ms to batch
  compression: snappy
  enable.idempotence: true  # Exactly-once semantics
Enter fullscreen mode Exit fullscreen mode

Kafka consumers track their position via offsets, enabling replay:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    group_id='order-processor',
    enable_auto_commit=False,  # Manual offset management
    auto_offset_reset='earliest',  # Start from beginning if no offset
    max_poll_records=500,
    session_timeout_ms=30000,
)

for message in consumer:
    process_order(message.value)
    # Commit offset after successful processing
    consumer.commit()
Enter fullscreen mode Exit fullscreen mode

RabbitMQ

RabbitMQ uses a message broker model with exchanges and queues:

import pika

# Connection
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='rabbitmq-1',
        port=5672,
        credentials=pika.PlainCredentials('user', 'pass'),
        heartbeat=600,
        blocked_connection_timeout=300,
    )
)
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(
    exchange='orders',
    exchange_type='topic',
    durable=True,
)

channel.queue_declare(
    queue='order-processing',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',  # Highly available queue
        'x-message-ttl': 86400000,  # 24 hours
        'x-dead-letter-exchange': 'orders-dlx',
    }
)

# Bind queue to exchange with routing key
channel.queue_bind(
    exchange='orders',
    queue='order-processing',
    routing_key='order.created.*',
)

# Publish message with delivery mode 2 for persistence
channel.basic_publish(
    exchange='orders',
    routing_key='order.created.europe',
    body=json.dumps(order_data),
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
        content_type='application/json',
        priority=5,
    ),
)
Enter fullscreen mode Exit fullscreen mode

Apache Pulsar

Pulsar separates compute and storage with a two-layer architecture:

import pulsar

# Pulsar client
client = pulsar.Client(
    'pulsar://pulsar-broker:6650',
    authentication=pulsar.AuthenticationToken('token'),
)

# Producer with schema
producer = client.create_producer(
    'persistent://public/default/orders',
    schema=pulsar.schema.JsonSchema(Order),
    send_timeout_millis=30000,
    batching_enabled=True,
    batching_max_publish_delay_ms=10,
    compression_type=pulsar.CompressionType.LZ4,
)

# Consumer with subscription type
consumer = client.subscribe(
    'persistent://public/default/orders',
    subscription_name='order-processor',
    subscription_type=pulsar.SubscriptionType.Shared,  # Load balanced
    initial_position=pulsar.InitialPosition.Earliest,
)

# Negative acknowledgement for retry
while True:
    msg = consumer.receive()
    try:
        order = msg.value()
        process_order(order)
        consumer.acknowledge(msg)
    except Exception:
        consumer.negative_acknowledge(msg)  # Requeue for retry
Enter fullscreen mode Exit fullscreen mode

Performance Comparison

Metric Kafka RabbitMQ Pulsar
Max throughput (single partition) ~100 MB/s ~10 MB/s ~100 MB/s
End-to-end latency (p99) 5-50ms <1ms 5-20ms
Max message size 1MB (default, configurable) 128MB 5MB
Partition scaling Add partitions (no rebalancing in v3+) Cluster of nodes Segmented (no rebalancing)
Storage efficiency High (zero-copy) Medium Very high (tiered storage)

Message Persistence and Durability

Feature Kafka RabbitMQ Pulsar
Default persistence Disk (all messages) Memory (configurable) Disk (all messages)
Replication Partition-based Queue mirroring / Quorum BookKeeper (segments)
Data retention Time/Size based Queue TTL + DLQ Time/Size + tiered storage
Exactly-once Yes (idempotent producer) No (at-least-once by default) Yes (deduplication)
Message replay Yes (offset reset) No (consumed messages deleted) Y

Read the full article on AI Study Room for complete code examples, comparison tables, and related resources.

Found this useful? Check out more developer guides and tool comparisons on AI Study Room.

Top comments (0)