DEV Community

Cover image for How I Built a Real-Time DDoS Detection Engine from Scratch
instanceofGod
instanceofGod

Posted on • Edited on

How I Built a Real-Time DDoS Detection Engine from Scratch

Introduction

Imagine you run a cloud storage platform. Thousands of users upload files, share documents, and collaborate every day. Then one morning, a single IP address sends 500 requests in 60 seconds. Your server slows to a crawl. Users can't log in. Files won't upload. You're under attack.

This is a DDoS attack — Distributed Denial of Service. The goal is simple: flood your server with so many requests that it can't serve real users anymore.

In this post, I'll walk you through how I built an anomaly detection engine that watches all incoming HTTP traffic in real time, learns what normal looks like, and automatically blocks attackers — all without any third-party security libraries.

Here's what the system does:

  • Reads Nginx access logs line by line as they are written
  • Tracks request rates using sliding time windows
  • Learns normal traffic patterns using a rolling statistical baseline
  • Detects anomalies using z-score math
  • Blocks attacking IPs using Linux firewall rules (iptables)
  • Sends Slack alerts within 10 seconds
  • Shows a live web dashboard

Let's break down each piece.


The Architecture

Before diving into code, here's how all the pieces connect:

Internet → Nginx (reverse proxy) → Nextcloud
               ↓ writes JSON logs
          /var/log/nginx/hng-access.log
               ↓ reads logs
          Detector Daemon (Python)
          ├── monitor.py   — tails the log file
          ├── baseline.py  — learns what normal looks like
          ├── detector.py  — spots anomalies
          ├── blocker.py   — blocks IPs with iptables
          ├── unbanner.py  — auto-releases bans
          ├── notifier.py  — sends Slack alerts
          └── dashboard.py — live web UI
Enter fullscreen mode Exit fullscreen mode

Nginx sits in front of Nextcloud and writes every HTTP request as a JSON line to a log file. The detector daemon tails that file continuously, processes each line, and acts when something looks wrong.

Everything runs in Docker. Nginx, Nextcloud, and the detector are three separate containers sharing a named volume so the log file is accessible to all of them.


Step 1 — Reading the Log File in Real Time

The first challenge is reading a log file as it grows. This is called "tailing" — like running tail -f in your terminal.

Here's the core idea:

f = open("/var/log/nginx/hng-access.log", "r")
f.seek(0, 2)  # jump to the end of the file

while True:
    line = f.readline()
    if line:
        process(line)
    else:
        time.sleep(0.05)  # nothing new yet, wait 50ms and try again
Enter fullscreen mode Exit fullscreen mode

f.seek(0, 2) moves the file cursor to the very end. This means we only see new lines written after the daemon starts — not the entire history.

readline() returns an empty string "" when there's nothing new. That's our signal to sleep briefly and retry.

Handling log rotation: Nginx periodically rotates its log file (renames the old one, creates a new one). If we don't handle this, we'd keep reading the old file forever. The fix is to compare the file's inode (a unique ID the OS assigns to every file) on each empty read:

current_inode = os.stat(log_path).st_ino
if current_inode != saved_inode:
    f.close()
    f = open(log_path, "r")  # reopen the new file from the beginning
    saved_inode = current_inode
Enter fullscreen mode Exit fullscreen mode

Each parsed JSON line gives us: source_ip, timestamp, method, path, status, response_size.


Step 2 — The Sliding Window

Now that we're reading log lines, we need to answer one question per request:

How many requests has this IP sent in the last 60 seconds?

A naive approach would be to store a counter per IP and reset it every minute. But that's wrong — it doesn't give you a true 60-second window. If an attacker sends 499 requests at 11:59:59 and 1 request at 12:00:01, a per-minute counter resets and misses the spike.

The correct approach is a sliding window using a deque (double-ended queue).

Here's the idea:

from collections import deque
import time

ip_windows = {}  # one deque per IP

def record_request(ip):
    now = time.time()
    if ip not in ip_windows:
        ip_windows[ip] = deque()

    # add this request's timestamp to the right
    ip_windows[ip].append(now)

    # evict timestamps older than 60 seconds from the left
    cutoff = now - 60
    while ip_windows[ip] and ip_windows[ip][0] < cutoff:
        ip_windows[ip].popleft()

    # the length is now the exact request count in the last 60 seconds
    return len(ip_windows[ip])
Enter fullscreen mode Exit fullscreen mode

Visually, the deque looks like this:

ip_windows["1.2.3.4"] = deque([
    1714000001.1,    oldest (left side)
    1714000001.9,
    1714000002.3,
    1714000059.8     newest (right side)
])
Enter fullscreen mode Exit fullscreen mode

As time passes, old timestamps fall off the left. New ones are added to the right. len(deque) always gives you the exact count for the last 60 seconds.

Why deque and not a regular list? Because deque is O(1) on both ends — appending to the right and removing from the left are both instant, regardless of how many items are in it. A regular list's pop(0) is O(n) — it shifts every element left, which gets slow under heavy traffic.

We maintain three sets of windows:

  • One deque per IP (per-IP request rate)
  • One global deque (total traffic rate across all IPs)
  • One error deque per IP (only 4xx/5xx responses)

Step 3 — The Rolling Baseline

The sliding window tells us the current rate. But is that rate normal or abnormal? To answer that, we need to know what normal looks like — and that changes over time. Traffic at 3am is different from traffic at 3pm.

This is the rolling baseline: a statistical model of recent traffic that updates automatically.

How it works:

Every second, we count how many requests arrived in that second and store it as a bucket:

second_bucket = int(time.time())
counts[second_bucket] += 1
Enter fullscreen mode Exit fullscreen mode

Every 60 seconds, we look at the last 30 minutes of these per-second counts (up to 1800 buckets) and compute:

mean   = sum(counts) / len(counts)
stddev = sqrt(sum((c - mean)**2 for c in counts) / len(counts))
Enter fullscreen mode Exit fullscreen mode

mean is the average requests per second over the last 30 minutes. stddev (standard deviation) measures how much the traffic varies — a low stddev means traffic is steady, a high stddev means it's spiky.

Per-hour slots: We also store counts grouped by UTC hour. If the current hour has 60 or more data points, we prefer it over the mixed 30-minute window. This makes the baseline more accurate — 2am traffic shouldn't influence the 2pm baseline.

Floor values: On a quiet server, mean and stddev could be very close to zero. If stddev is 0, dividing by it in the z-score formula would crash the program. So we apply a floor:

mean   = max(computed_mean,   1.0)
stddev = max(computed_stddev, 0.5)
Enter fullscreen mode Exit fullscreen mode

This ensures the math always works, even on a server with almost no traffic.


Step 4 — Detecting Anomalies

Now we have:

  • current_rate — requests from this IP in the last 60 seconds
  • mean — average requests per second over the last 30 minutes
  • stddev — how much traffic normally varies

We use z-score to decide if the current rate is abnormal:

z = (current_rate - mean) / stddev
Enter fullscreen mode Exit fullscreen mode

The z-score tells you how many standard deviations above normal the current rate is. A z-score of 1.0 means slightly above average. A z-score of 3.0 means extremely unusual — statistically, only 0.3% of normal traffic would ever reach this level.

We flag an IP as anomalous if either condition fires:

z = (ip_rate - mean) / stddev
if z > 3.0 or ip_rate > 5 * mean:
    # anomaly detected
Enter fullscreen mode Exit fullscreen mode

The 5x mean rule is a safety net — if traffic is so extreme that even a high stddev wouldn't catch it, the multiplier rule fires first.

Error surge tightening: If an IP is also generating a lot of 4xx/5xx errors (failed login attempts, scanning for vulnerabilities), we tighten the detection threshold from 3.0 down to 2.0. This means we act faster on IPs that are both high-volume and generating errors.

The same logic applies globally — if total traffic across all IPs spikes, we send a Slack alert (but don't block a single IP since the attack may be distributed).


Step 5 — Blocking with iptables

When an IP is flagged as anomalous, we block it at the Linux kernel level using iptables.

iptables is Linux's built-in firewall. It processes every network packet before it reaches your application. A DROP rule tells the kernel to silently discard all packets from a specific IP — they never reach Nginx, never reach Nextcloud, never consume any application resources.

The command to block an IP:

iptables -I INPUT -s 1.2.3.4 -j DROP
Enter fullscreen mode Exit fullscreen mode

Breaking this down:

  • -I INPUT — insert a rule into the INPUT chain (incoming traffic)
  • -s 1.2.3.4 — match packets from this source IP
  • -j DROP — silently drop them (no response sent to the attacker)

In Python, we run this as a subprocess:

import subprocess

subprocess.run(
    ["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"],
    check=True,
    capture_output=True,
)
Enter fullscreen mode Exit fullscreen mode

We use a list (not a shell string) to avoid shell injection vulnerabilities. check=True raises an exception if the command fails so we can log it.

To unban:

iptables -D INPUT -s 1.2.3.4 -j DROP
Enter fullscreen mode Exit fullscreen mode

-D deletes the rule instead of inserting it.


Step 6 — Auto-Unban with Backoff

We don't ban IPs forever (unless they keep attacking). The unban schedule uses exponential backoff:

Offense Ban Duration
1st 10 minutes
2nd 30 minutes
3rd 2 hours
4th+ Permanent

A background thread checks every 30 seconds whether any ban has expired:

for ip, record in banned_ips.items():
    elapsed_minutes = (now - record["banned_at"]) / 60
    if elapsed_minutes >= record["duration_minutes"]:
        unban(ip)
        send_slack_unban_alert(ip)
Enter fullscreen mode Exit fullscreen mode

The offense count persists across ban/unban cycles. So if an IP gets banned, serves its 10-minute ban, gets released, and attacks again — the next ban is 30 minutes.


Step 7 — Slack Alerts

Every ban, unban, and global anomaly sends a Slack message via webhook. The webhook is just an HTTPS POST to a URL Slack gives you:

import requests

requests.post(
    webhook_url,
    json={"text": ":rotating_light: *IP BANNED* ..."},
    timeout=8,
)
Enter fullscreen mode Exit fullscreen mode

timeout=8 ensures we complete within the 10-second requirement even if Slack is slow. The POST runs in a background thread so it never blocks the detector loop.

A ban alert looks like this in Slack:

🚨 IP BANNED
• IP: 203.0.113.42
• Condition: z=4.21
• Current rate: 312 req/60s
• Baseline mean: 8.3421 | stddev: 2.1500
• Ban duration: 10 min
• Timestamp: 2025-04-20 14:32:01 UTC
Enter fullscreen mode Exit fullscreen mode

Step 8 — The Live Dashboard

The dashboard is a FastAPI web app with two endpoints:

  • GET /metrics — returns live JSON data
  • GET /— returns an HTML page that polls /metrics every 3 seconds

The HTML page uses plain JavaScript to fetch /metrics and update the DOM without reloading the page:

async function refresh() {
    const r = await fetch('/metrics');
    const d = await r.json();
    document.getElementById('global-rate').textContent = d.global_req_per_60s;
    // ... update other elements
}
setInterval(refresh, 3000);
Enter fullscreen mode Exit fullscreen mode

The dashboard shows:

  • Banned IPs with ban time, duration, and offense count
  • Global requests per 60 seconds
  • Top 10 source IPs
  • CPU and memory usage
  • Effective baseline mean and stddev
  • Daemon uptime

Putting It All Together

All the modules run as threads inside a single Python process:

main.py
├── Thread: monitor     → tails log, pushes events to queue
├── Thread: detector    → consumes queue, runs detection
├── Thread: baseline    → recalculates every 60 seconds
├── Thread: unbanner    → checks ban expiry every 30 seconds
└── Thread: dashboard   → uvicorn serving FastAPI on port 8080
Enter fullscreen mode Exit fullscreen mode

The main thread just keeps the process alive with while True: sleep(1) and handles KeyboardInterrupt for clean shutdown.

All shared state (banned IPs, sliding windows, baseline values) is protected by threading.Lock() to prevent race conditions between threads.


Key Takeaways

  • Sliding windows with deques give you exact per-second rate tracking with O(1) performance
  • Rolling baselines let the system adapt to real traffic patterns instead of relying on hardcoded thresholds
  • Z-score detection is statistically sound — it flags things that are genuinely unusual relative to recent history
  • iptables blocks at the kernel level — the most efficient place to stop an attack
  • Backoff unbanning balances security with fairness — short bans for first offenses, longer for repeat attackers

The full source code is available at: https://github.com/nielvid/anomaly-detector


Setup & Walkthrough for Beginners

Now that you understand how the system works, let's get it running on your own machine. This step-by-step guide assumes you're on Ubuntu 22.04 or 24.04 (the instructions work on most Linux distributions with minor adjustments).

Prerequisites

Before starting, make sure you have:

  1. A Linux machine (physical, VM, or cloud instance like DigitalOcean, AWS EC2, or Linode)
  2. Root or sudo access (iptables commands require privileges)
  3. Docker and Docker Compose installed

If Docker isn't installed yet, run:

sudo apt update
sudo apt install -y docker.io docker-compose
sudo systemctl enable --now docker
sudo usermod -aG docker $USER
# Log out and back in for group changes to take effect
Enter fullscreen mode Exit fullscreen mode

Step 1: Clone the Repository

git clone https://github.com/nielvid/anomaly-detector.git
cd anomaly-detector
Enter fullscreen mode Exit fullscreen mode

Step 2: Configure Nginx to Write JSON Logs

Nginx needs to output logs in JSON format so the Python detector can parse them easily. Create or edit /etc/nginx/nginx.conf and add a custom log format:

sudo nano /etc/nginx/nginx.conf
Enter fullscreen mode Exit fullscreen mode

Inside the http block, add:

http {
    log_format json escape=json '{'
        '"timestamp":"$time_iso8601",'
        '"source_ip":"$remote_addr",'
        '"method":"$request_method",'
        '"path":"$request_uri",'
        '"status":$status,'
        '"response_size":$body_bytes_sent'
    '}';

    access_log /var/log/nginx/hng-access.log json;
}
Enter fullscreen mode Exit fullscreen mode

Then restart Nginx:

sudo systemctl restart nginx
Enter fullscreen mode Exit fullscreen mode

Verify it's working: Generate some traffic by visiting your server's IP in a browser, then run:

sudo tail -n 2 /var/log/nginx/hng-access.log
Enter fullscreen mode Exit fullscreen mode

You should see JSON lines like:

{"timestamp":"2025-04-20T14:32:01+00:00","source_ip":"192.168.1.100","method":"GET","path":"/","status":200,"response_size":1234}
Enter fullscreen mode Exit fullscreen mode

Step 3: Set Up the Python Environment

The detector runs outside Docker for simplicity in this beginner walkthrough. Install Python dependencies:

cd ~/anomaly-detector
python3 -m venv venv
source venv/bin/activate
pip install fastapi uvicorn requests psutil
Enter fullscreen mode Exit fullscreen mode

Step 4: Configure Slack Alerts (Optional But Recommended)

  1. Go to https://api.slack.com/apps
  2. Click "Create New App" → "From Scratch"
  3. Name it "Anomaly Detector" and choose your workspace
  4. In the left sidebar, click "Incoming Webhooks"
  5. Toggle "Activate Incoming Webhooks" to On
  6. Click "Add New Webhook to Workspace"
  7. Choose a channel (like #alerts or #security)
  8. Copy the webhook URL (looks like https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX)

Create a config file:

nano ~/anomaly-detector/config.py
Enter fullscreen mode Exit fullscreen mode

Add this content (replace the webhook URL with yours):

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/URL/HERE"
DETECTION_WINDOW_SECONDS = 60
BASELINE_MINUTES = 30
ALERT_COOLDOWN_SECONDS = 300  # Don't spam the same alert
Enter fullscreen mode Exit fullscreen mode

Step 5: Run the Detector

First, make sure you have write access to the log file (or run with sudo):

sudo chmod 644 /var/log/nginx/hng-access.log
Enter fullscreen mode Exit fullscreen mode

Now start the detector:

cd ~/anomaly-detector
source venv/bin/activate
sudo python3 main.py   # sudo needed for iptables
Enter fullscreen mode Exit fullscreen mode

You should see output like:

[2025-04-20 14:30:01] Monitor thread started, tailing /var/log/nginx/hng-access.log
[2025-04-20 14:30:01] Baseline thread started
[2025-04-20 14:30:01] Dashboard running at http://0.0.0.0:8080
[2025-04-20 14:30:01] Unbanner thread started
Enter fullscreen mode Exit fullscreen mode

Step 6: Generate Test Traffic to See Detection in Action

Open a second terminal and use curl to simulate an attack:

# Normal traffic (won't trigger a ban)
for i in {1..10}; do curl -s http://localhost/ > /dev/null; sleep 1; done

# Simulated DDoS attack (will trigger a ban)
for i in {1..500}; do curl -s http://localhost/ > /dev/null & done
wait
Enter fullscreen mode Exit fullscreen mode

Within 60 seconds, the detector will:

  1. Notice the spike from your IP
  2. Calculate a high z-score (likely > 10)
  3. Run iptables -I INPUT -s YOUR_IP -j DROP
  4. Send a Slack alert (if configured)
  5. Show the ban in the dashboard

Step 7: View the Live Dashboard

Open a web browser and navigate to:

http://YOUR_SERVER_IP:8080
Enter fullscreen mode Exit fullscreen mode

You'll see real-time metrics including:

  • Global request rate
  • Top talking IPs
  • Currently banned IPs with remaining time
  • Current baseline (mean and stddev)

Step 8: Verify iptables Block Is Working

In the terminal, check if your IP is blocked:

sudo iptables -L INPUT -n -v
Enter fullscreen mode Exit fullscreen mode

You should see a line like:

Chain INPUT (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target     prot opt in     out     source               destination
  15  1260 DROP       all  --  *      *       YOUR_IP              0.0.0.0/0
Enter fullscreen mode Exit fullscreen mode

To test that traffic is actually dropped, try curling from the blocked IP — the command will hang until it times out.

Step 9: Test Auto-Unban

Wait 10 minutes (or modify the ban duration in the code to 1 minute for testing). The unbanner thread will automatically remove the ban:

# After ban expires, check iptables again
sudo iptables -L INPUT -n -v
# The DROP rule should be gone
Enter fullscreen mode Exit fullscreen mode

Step 10: Run as a System Service (Production)

To keep the detector running after you log out, create a systemd service:

sudo nano /etc/systemd/system/anomaly-detector.service
Enter fullscreen mode Exit fullscreen mode

Add this content:

[Unit]
Description=Anomaly Detection Engine
After=network.target nginx.service

[Service]
Type=simple
User=root
WorkingDirectory=/home/YOUR_USERNAME/anomaly-detector
ExecStart=/home/YOUR_USERNAME/anomaly-detector/venv/bin/python3 /home/YOUR_USERNAME/anomaly-detector/main.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
Enter fullscreen mode Exit fullscreen mode

Enable and start it:

sudo systemctl daemon-reload
sudo systemctl enable anomaly-detector
sudo systemctl start anomaly-detector
sudo systemctl status anomaly-detector
Enter fullscreen mode Exit fullscreen mode

Common Troubleshooting

Problem: Permission denied when opening log file

sudo chmod 644 /var/log/nginx/hng-access.log
# Or run the detector with sudo
sudo python3 main.py
Enter fullscreen mode Exit fullscreen mode

Problem: iptables: Command not found

sudo apt install iptables
Enter fullscreen mode Exit fullscreen mode

Problem: Detector can't see new log lines

# Check if log rotation is moving the file
ls -la /var/log/nginx/
# The detector should auto-detect inode changes
Enter fullscreen mode Exit fullscreen mode

Problem: Dashboard shows 0 requests

  • Generate some traffic first
  • Verify the log file path in main.py matches your Nginx log location
  • Check that Nginx is actually writing JSON format

Problem: Too many false positives

  • Increase the z-score threshold from 3.0 to 4.0
  • Increase baseline minutes from 30 to 60
  • Whitelist known good IPs in the detector code

Testing Locally Without a Real Server

If you don't have a Linux server handy, you can test everything in a Docker container:

# Create a test Docker container with Ubuntu
docker run -it --rm --cap-add=NET_ADMIN ubuntu:22.04 bash

# Inside the container:
apt update && apt install -y iptables nginx python3 python3-pip
# Then follow steps 2-6 above
Enter fullscreen mode Exit fullscreen mode

The --cap-add=NET_ADMIN is required for iptables to work inside Docker.

Next Steps & Customization

Once you have the basic system running, here are ways to extend it:

  1. Add IP whitelisting — Prevent your office or trusted partners from being banned
  2. Persist bans across restarts — Save banned IPs to a SQLite database
  3. Add rate limiting per path — Block IPs that hammer /login but allow /static
  4. Integrate with fail2ban — Use the detector as a faster, smarter rules engine
  5. Add geolocation — Block or alert on traffic from unexpected countries

The full source code with all modules is available at: https://github.com/nielvid/anomaly-detector


Built as part of the HNG DevOps Track — Stage 3

Top comments (0)