DEV Community

Cover image for The real problem with ingesting MongoDB into Delta Lake (and how I built a library to fix it)
Luiz Oliveira
Luiz Oliveira

Posted on

The real problem with ingesting MongoDB into Delta Lake (and how I built a library to fix it)

If you've ever built ETL pipelines pulling data from MongoDB into Delta Lake using Spark, you've probably hit this wall. The pipeline works fine — until it doesn't. A single document with an unexpected shape is enough to break the entire write, leave the table in an inconsistent state, and send your on-call engineer digging through Spark logs at 11pm.

I built and maintained more than 10 of these jobs in my last role. After solving the same problem manually across every single one, I decided to build the abstraction that should have existed from the start: nosql-delta-bridge.

pip install nosql-delta-bridge
Enter fullscreen mode Exit fullscreen mode

The problem isn't bad data — it's structural

MongoDB's schema-free nature is a feature for application developers. For pipelines, it's a minefield. The problems came in three flavors:

1. Polymorphic fields

Some collections had fields typed as anyOf[object|bool|string] in the JSON Schema — completely valid from the application's perspective. A status field might be a string in older documents and an integer in newer ones. A value field might be a number, a boolean, or a nested object depending on which part of the application wrote it.

Spark infers the schema from a sample at read time, commits to it, and the moment a document outside that sample has a different type, the entire write fails:

AnalysisException: Cannot cast StringType to IntegerType
Enter fullscreen mode Exit fullscreen mode

The only safe workaround was casting everything to StringType defensively — which meant no type guarantees in the raw Delta table and re-casting in every downstream job.

2. Inconsistent nested structs

Arrays of structs where fields appeared or disappeared depending on the document version. A subfield present in some documents, missing in others. Nested structs with subfields that changed shape across batches.

Every job ended up with the same boilerplate:

def rebuild_struct(df, field, schema):
    return df.withColumn(
        field,
        struct([
            coalesce(col(f"{field}.{f}"), lit(None).cast(t)).alias(f)
            for f, t in schema.items()
        ])
    )
Enter fullscreen mode Exit fullscreen mode

Rebuild the struct by hand. Cast every field explicitly. Handle missing fields with lit(None). Drop fields that appeared in some batches but not others. Repeat across every collection.

3. Silent failures

When the pipeline didn't crash outright, bad documents were silently coerced or dropped. There was no dead-letter queue, no audit trail, no contract that said "this field must be this type." Problems surfaced three jobs downstream — not at the ingestion boundary where they actually happened.


What existing tools don't solve

A common suggestion in this space is to use a data observability tool like Elementary. Elementary is genuinely useful — but it operates at the table/model level. It tells you the table is unhealthy, not which document made it unhealthy.

The investigation workflow without document-level isolation:

  1. Elementary fires an alert — table freshness failed
  2. Engineer checks Spark logs — finds a cast error
  3. Engineer traces back to MongoDB — tries to identify the offending document in a batch of 100k records
  4. Even after finding it — casting it correctly in Spark is either impossible or takes significant work when the schema is inconsistent enough

The inspection step is entirely manual, and finding the problematic document can take hours. And once you find it, you still have to figure out what to do with it while the rest of the batch sits unwritten.


How nosql-delta-bridge works

The core idea is simple: every document either lands in the Delta table or goes to a dead-letter queue with an explicit rejection reason. Nothing is silently dropped. Nothing silently crashes the pipeline.

The workflow has two steps:

Step 1 — Infer a schema contract from known-good historical data

bridge infer historical.json --output payments.schema.json
Enter fullscreen mode Exit fullscreen mode

This generates a schema contract from a sample of documents you trust. The inference engine handles type conflicts using a configurable strategy — by default, the widest type wins and fields are nullable.

Step 2 — Ingest with validation

bridge ingest incoming.json ./delta/payments \
  --schema payments.schema.json \
  --dlq rejected.ndjson
Enter fullscreen mode Exit fullscreen mode
incoming.json · 1,000 documents · schema: payments.schema.json
  written:   994  →  delta/payments
  rejected:    6  →  rejected.ndjson
Enter fullscreen mode Exit fullscreen mode

The 994 valid documents land in Delta Lake. The 6 that couldn't be reconciled go to the DLQ — with an explicit reason attached to each one:

{
  "_id": "abc123",
  "amount": "99.90",
  "_dlq_reason": "cast failed on 'amount': expected double, got string",
  "_dlq_stage": "coerce",
  "_dlq_ts": "2025-04-28T14:32:01Z"
}
Enter fullscreen mode Exit fullscreen mode

No log archaeology. No manual document hunting. The bad document is already isolated, already labeled, at the exact moment ingestion ran.


What it handles

Scenario Behavior
Field type mismatch (castable) Cast applied, document written
Field type mismatch (not castable) Document → DLQ with reason
Missing required field Document → DLQ with reason
New field not in schema Configurable: reject or evolve schema
Full type migration (all docs changed type) 0 written, all → DLQ + warning
Nested struct with missing subfield Filled with null, document written
Array of mixed types Configurable: cast to widest or reject

Why pure Python and not Spark

The MongoDB Connector for Apache Spark is the standard approach — but it requires a cluster. Most teams running smaller MongoDB collections don't need a full Spark environment just to move data into Delta Lake.

nosql-delta-bridge uses delta-rs under the hood — a pure Python implementation of the Delta Lake protocol. No cluster required. It runs locally, in a Docker container, or on a small VM. Anyone can clone the repo and run the examples in minutes.

For large-scale production workloads that already run on Spark, the library-style design means you can wrap it or use its schema inference and coercion logic independently.


Where it fits in your stack

If you're using observability tools downstream, this fits cleanly upstream:

MongoDB
  ↓
nosql-delta-bridge    ← structural validation, DLQ, schema contract
  ↓
Delta Lake
  ↓
dbt models
  ↓
Elementary / Monte Carlo    ← business-level anomaly detection
Enter fullscreen mode Exit fullscreen mode

Elementary tells you the table is sick. nosql-delta-bridge makes sure the table never gets sick from a bad document in the first place — and when it does, tells you exactly which document and why, before it ever touched the table.


Try it

pip install nosql-delta-bridge
Enter fullscreen mode Exit fullscreen mode

If you work with MongoDB → Delta Lake pipelines and want to stress-test it against your own collections, I'd genuinely appreciate it. Especially interested in edge cases — deeply nested structs, arrays of structs with inconsistent shapes, or collections with heavy anyOf variance.

Open an issue on GitHub or leave a comment describing your scenario.

GitHub: https://github.com/lhrick/nosql-delta-bridge

PyPI: https://pypi.org/project/nosql-delta-bridge/


Built this because I got tired of writing the same defensive boilerplate across every MongoDB collection I touched. If you've felt the same pain, I'd love to hear how you've handled it.

Top comments (4)

Collapse
 
gimi5555 profile image
Gilder Miller

Hi Luiz, thanks for sharing!
Pipeline problems used to happen, and it's one of the great solutions.
The dead-letter queue with specific rejection reasons is particularly smart - it turns debugging from a mystery into a fixable issue.
How does the library handle schema evolution when new nested fields appear in MongoDB documents over time?

Collapse
 
lholiv profile image
Luiz Oliveira • Edited

Hi Gilder, great question! Schema evolution was one of the trickiest parts to get right.

When a new nested field appears (say address.city shows up in a batch where it never existed before), the library doesn't reject the document or crash. Here's what happens step by step:

  1. Coerce passes it through: the coercion step only enforces rules for fields that exist in the contract. Unknown fields are never rejected at this stage;
  2. The batch schema is inferred separately. New nested fields are detected as dot-notation paths (address.city, address.zip) respecting the configured max_depth.
  3. Schemas are merged. Old schema + batch schema produces an evolved schema. Any field only in the new batch is added as nullable=True, since it was absent from all historical documents.
  4. Two schemas are used for the write: the coercion schema stays as the original contract (existing field types never relax silently), while the write schema includes the new nullable fields. This lets the Delta table grow without creating type conflicts on existing columns.
  5. The schema file is updated: but only with new fields. Old fields are copied verbatim from the original contract. The schema file is treated as a contract, not a snapshot of the latest batch.

For example:

[{"name": "Alice", "age": 30},
 {"name": "Bob",   "age": 25, "address": {"city": "SP"}}]

written: 2  →  Alice gets address=null, Bob gets address.city="SP"
DLQ:     0
Schema:  address.city: string, nullable=true added
Enter fullscreen mode Exit fullscreen mode

Next run without address continues to work — those rows just get null for address.city.
The one boundary case worth knowing: if a new nested field is deeper than max_depth (default 5), it's stored as a JSON string column at the truncation point rather than split into leaf columns — consistent with how the flattener handles it.

The repo has a example script that you could test these scenario.
Happy to go deeper on any of this if you hit edge cases.

Collapse
 
gimi5555 profile image
Gilder Miller

Yes, That's a solid approach to schema drift. Writing with an evolved schema while keeping coercion strict shows you've thought through the real-world pain points.
The max_depth fallback to JSON is pragmatic. How often does that become a bottleneck with nested APIs that keep adding levels? I've seen it lock away useful fields when the structure grows beyond the initial limit.

Thread Thread
 
lholiv profile image
Luiz Oliveira

You're right to flag it, this is a real limitation:

max_depth is configurable (default 5), but hitting the boundary is silent today. When a field exceeds the depth limit, it's stored as a JSON string at the truncation point — still accessible via json_extract, but not a proper typed column. No warning fires.

The CLI already fires a type widening warning when batch_schema disagrees with old_schema on a type, depth truncation is the same kind of event: nothing failed, but something the user should know happened. I will work to add this

You can raise the limit via InferConfig:

bridge infer latest_batch.json --output schema.json --max-depth 8
bridge ingest full_history.json ./delta/devices --schema schema.json --mode overwrite
Enter fullscreen mode Exit fullscreen mode

Historical rows get null for the newly split columns, but the structure is fully recovered.

On the silver layer side, the JSON string fallback is also manageable downstream. Since the raw data is preserved at the truncation point, we can handle the extraction explicitly.

Good catch by the way! This kind of feedback is exactly what helps the library mature.