In the world of Data Engineering, handling heterogeneous time-series data is a recurring nightmare. Whether you are a biohacker trying to optimize your sleep or a data scientist building a health app, syncing data from an Oura Ring and a Garmin watch often results in nasty timestamp overlaps, conflicting heart rate readings, and "ghost" activity logs.
Standard interpolation works for smooth curves, but what happens when Garmin says you were running at 140 BPM while Oura says you were napping? This is where LLM-based data cleaning enters the chat. In this guide, we'll build a pipeline using Pandas, Dask, and Instructor to automatically resolve data conflicts and fix outliers with the power of GPT-4.
Pro-Tip: For more production-ready engineering patterns and advanced health-tech data architectures, definitely check out the deep dives over at the WellAlly Blog.
ποΈ The Architecture: The "Smart" Cleaning Pipeline
Before we dive into the code, let's visualize how we move from messy, overlapping CSVs to a unified, clean time-series dataset.
graph TD
A[Oura Ring Data] --> C(Time-Series Alignment)
B[Garmin Connect Data] --> C
C --> D{Conflict Detected?}
D -- No --> E[Linear Interpolation]
D -- Yes --> F[Instructor/LLM Agent]
F --> G[Context-Aware Repair]
E --> H[Final Merged Dataframe]
G --> H
H --> I[Dask for Parallel Processing]
I --> J[Clean Bio-Data API/Dashboard]
π οΈ The Tech Stack
- Pandas: Our bread and butter for data manipulation.
- Dask: To handle larger-than-memory datasets and parallelize LLM calls.
- Instructor: A brilliant library that uses Pydantic to force LLMs to return structured data.
- Python: The glue holding it all together. π
π οΈ Step 1: Defining the Conflict Schema
Standard cleaning scripts fail because they lack "context." An LLM, however, understands that if your step_count is 5000 but your heart_rate is 55, one of those sensors is lying.
We use Instructor to define a schema that the LLM must follow when resolving conflicts.
from pydantic import BaseModel, Field
from typing import Optional
import instructor
from openai import OpenAI
# Initialize Instructor
client = instructor.from_openai(OpenAI())
class DataRepair(BaseModel):
resolved_value: float = Field(..., description="The corrected value for the metric.")
reasoning: str = Field(..., description="Explanation of why this device's data was chosen or why an average was used.")
is_outlier: bool = Field(..., description="Whether the original data point was a sensor malfunction.")
def resolve_conflict_with_llm(metric: str, val_oura: float, val_garmin: float, context: str):
return client.chat.completions.create(
model="gpt-4o",
response_model=DataRepair,
messages=[
{"role": "system", "content": "You are a specialized bio-data engineer."},
{"role": "user", "content": f"Conflict in {metric}: Oura says {val_oura}, Garmin says {val_garmin}. Context: {context}"}
]
)
π Step 2: Time-Series Alignment with Pandas
First, we need to get both datasets onto the same temporal grid. Garmin might log every second, while Oura logs every 5 minutes.
import pandas as pd
def align_data(oura_df, garmin_df):
# Standardize time to UTC
oura_df['timestamp'] = pd.to_datetime(oura_df['ts']).dt.tz_localize(None)
garmin_df['timestamp'] = pd.to_datetime(garmin_df['ts']).dt.tz_localize(None)
# Reindex to 1-minute intervals and join
merged = pd.merge_asof(
oura_df.sort_values('timestamp'),
garmin_df.sort_values('timestamp'),
on='timestamp',
direction='nearest',
tolerance=pd.Timedelta('2min')
)
return merged
# Example of finding a "Conflict"
# If Heart Rate difference > 15 BPM, we flag it for the LLM
π Step 3: Scaling the "Repair" with Dask
Calling an LLM for every single row is expensive and slow. We use Dask to only send the "conflict rows" to the LLM in parallel.
import dask.dataframe as dd
def process_chunk(df):
# Logic to identify conflicts
df['conflict'] = (df['hr_oura'] - df['hr_garmin']).abs() > 15
# Apply LLM resolution only where needed
def repair_row(row):
if row['conflict']:
res = resolve_conflict_with_llm(
"HeartRate", row['hr_oura'], row['hr_garmin'],
f"Activity level: {row['activity_type']}"
)
return res.resolved_value
return row['hr_garmin'] # Default to Garmin
df['hr_cleaned'] = df.apply(repair_row, axis=1)
return df
# Convert to Dask and compute
# dask_df = dd.from_pandas(merged_df, npartitions=4)
# clean_df = dask_df.map_partitions(process_chunk).compute()
π₯ Why This Matters: The Biohacking Context
Data cleaning isn't just about deleting NaN values anymore. In the era of LLMs, we can perform Semantic Data Cleaning.
Imagine your Garmin registers a spike in stress because you were watching a horror movie, but your Oura Ring knows your body temperature is normal. A hard-coded script can't distinguish that; an LLM backed by a Pydantic schema can.
If you are looking for more "production-ready" examples of how to build these types of agentsβspecifically for personal health optimizationβI highly recommend browsing the technical articles at wellally.tech/blog. They cover the intersection of AI, health data, and robust software engineering in much more detail.
π― Conclusion
By combining the structural power of Pandas with the reasoning capabilities of LLMs (via Instructor), we transform a messy data-engineering headache into a clean, high-fidelity stream of insights.
What's next?
- Try adding a "Confidence Score" to your Pydantic model.
- Use Dask to process years of historical data.
- Let me know in the comments: How do you handle sensor conflicts in your projects? π¬
Happy coding! ππ»
Top comments (0)