DEV Community

Ayat Saadat
Ayat Saadat

Posted on

ayat saadati — Complete Guide

Saadati-Connect: The Universal Data Integration Fabric

You know, in my years wrangling data, one thing has always been a constant pain point: getting data from here to there and then making sure it actually talks to that other thing over there. We've all been there, right? Custom scripts, endless API client implementations, a whole lot of head-scratching. That's precisely why a project like Saadati-Connect genuinely excites me.

Born from a need to simplify the often-convoluted world of data integration, Saadati-Connect isn't just another library; it's an opinionated toolkit designed to abstract away the boilerplate of connecting to disparate data sources – think databases, REST APIs, GraphQL endpoints, and even those quirky legacy systems – and then provides a fluid way to transform and orchestrate that data. It's about letting you focus on the logic of your data flow, not the mechanics of the connection.

This project, spearheaded by the insightful work of Ayat Saadati, aims to be that connective tissue that brings your data ecosystem together without drowning you in complexity. It's Python-based, which, let's be honest, is practically the lingua franca for data folks these days.

Table of Contents

Features

What makes Saadati-Connect a standout? It's the thoughtful combination of flexibility and convention. Here's a quick rundown of what you can expect:

  • Unified Interface: Connect to various sources (SQL, NoSQL, REST, GraphQL, CSV, etc.) through a consistent, easy-to-understand API. No more learning a new library for every data type!
  • Built-in Transformers: Common data cleaning and transformation utilities are baked right in, saving you from writing repetitive code.
  • Schema Inference & Validation: Attempts to understand your data's structure and can enforce validation rules, catching issues early.
  • Extensible Architecture: Need to connect to something truly exotic? The framework is designed for you to easily plug in your own custom connectors and transformers.
  • Asynchronous Support: For those high-throughput scenarios, Saadati-Connect plays nice with asyncio, ensuring your data pipelines are snappy.
  • Robust Error Handling: Sensible defaults and clear mechanisms for dealing with common data integration pitfalls.

Installation

Getting Saadati-Connect up and running is as straightforward as you'd hope for any modern Python library. We're talking pip, of course.

First, I always recommend using a virtual environment. It keeps your project dependencies clean and prevents conflicts. If you're not doing this yet, honestly, you should start today.

# Create a virtual environment
python3 -m venv saadati-env

# Activate it
# On macOS/Linux:
source saadati-env/bin/activate
# On Windows:
.\saadati-env\Scripts\activate

# Now, install Saadati-Connect
pip install saadati-connect
Enter fullscreen mode Exit fullscreen mode

Important Note: Depending on the specific database or API connectors you plan to use, you might need to install optional dependencies. For instance, if you're working with PostgreSQL, you'd add psycopg2-binary:

pip install "saadati-connect[postgres]" # Or specific packages like psycopg2-binary
pip install "saadati-connect[mysql]"    # Or pymysql
pip install "saadati-connect[mongo]"    # Or pymongo
pip install "saadati-connect[graphql]"  # Or gql
Enter fullscreen mode Exit fullscreen mode

You can even install all common connectors if you're feeling adventurous:

pip install "saadati-connect[all]"
Enter fullscreen mode Exit fullscreen mode

Quick Start

Let's dive into some practical examples. I find that getting my hands dirty with a few lines of code is the fastest way to grasp a new tool's power.

Connecting to a Relational Database

Imagine you have a simple SQLite database with some user data. Here's how Saadati-Connect makes fetching that data a breeze.

# db_example.py
from saadati_connect.connectors import SQLiteConnector
from saadati_connect.models import ConnectionConfig

# First, let's pretend we have a database
# In a real scenario, this DB would already exist.
# For this example, we'll create a dummy one in memory.
import sqlite3
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
cursor.execute("INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com')")
cursor.execute("INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com')")
conn.commit()
conn.close() # Important: close the connection for the connector to open its own

# Now, use Saadati-Connect
db_config = ConnectionConfig(
    type="sqlite",
    uri="sqlite:///:memory:" # Or "sqlite:///path/to/your.db"
)

# Instantiate the connector
sqlite_connector = SQLiteConnector(db_config)

try:
    # Execute a simple query
    users = sqlite_connector.fetch_data("SELECT id, name, email FROM users")
    print("Fetched Users:")
    for user in users:
        print(f"  ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")

    # Let's insert some data
    new_user_id = sqlite_connector.execute_query(
        "INSERT INTO users (name, email) VALUES (?, ?)",
        params=('Charlie', 'charlie@example.com'),
        fetch_id=True # Try to fetch the last inserted ID
    )
    if new_user_id:
        print(f"\nInserted new user with ID: {new_user_id}")

    # Verify insertion
    all_users = sqlite_connector.fetch_data("SELECT id, name, email FROM users")
    print("\nAll Users after insertion:")
    for user in all_users:
        print(f"  ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    # Saadati-Connect handles closing, but good practice to be explicit if you manage raw connections
    pass

Enter fullscreen mode Exit fullscreen mode

Interacting with a REST API

Connecting to web APIs can be a minefield of authentication, pagination, and rate limits. Saadati-Connect streamlines this. Let's hit a public API, say, JSONPlaceholder for some fake posts.

# api_example.py
from saadati_connect.connectors import RESTConnector
from saadati_connect.models import ConnectionConfig
import asyncio

# Configure the API endpoint
api_config = ConnectionConfig(
    type="rest",
    uri="https://jsonplaceholder.typicode.com"
)

rest_connector = RESTConnector(api_config)

async def fetch_and_process_posts():
    print("Fetching posts from JSONPlaceholder...")
    try:
        # Fetch posts from the /posts endpoint
        posts = await rest_connector.get("/posts", params={"_limit": 5}) # Get first 5 posts
        print(f"Fetched {len(posts)} posts.")
        for post in posts:
            print(f"  Post ID: {post['id']}, Title: {post['title'][:50]}...")

        # Fetch a single post
        single_post = await rest_connector.get("/posts/1")
        print(f"\nFetched single post (ID 1): {single_post['title']}")

        # Let's simulate posting data (though JSONPlaceholder won't actually save it)
        new_post_data = {
            "title": "Saadati-Connect Demo Post",
            "body": "This is a test post created using Saadati-Connect. It's pretty neat!",
            "userId": 1
        }
        created_post = await rest_connector.post("/posts", data=new_post_data)
        print(f"\nSimulated creation of new post (ID: {created_post.get('id')}): {created_post['title']}")

    except Exception as e:
        print(f"An error occurred during API interaction: {e}")

if __name__ == "__main__":
    asyncio.run(fetch_and_process_posts())

Enter fullscreen mode Exit fullscreen mode

Simple Data Transformation

Data rarely arrives in the perfect shape you need it. Saadati-Connect's Transformer module helps you clean things up.

# transform_example.py
from saadati_connect.transformers import DataTransformer

# Imagine this data came from a database or API
raw_data = [
    {"Name": "  alice   ", "AGE": 30, "EMAIL": "ALICE@EXAMPLE.COM"},
    {"Name": "Bob", "AGE": "25", "EMAIL": "bob@example.com  "},
    {"Name": "Charlie", "AGE": 40, "EMAIL": "charlie@EXAMPLE.COM"},
    {"Name": "Diana", "AGE": None, "EMAIL": "diana@example.com"},
]

print("Raw Data:")
for item in raw_data:
    print(item)

# Define some transformation rules
transformer = DataTransformer()

# Rule 1: Trim whitespace from strings
transformer.add_rule(
    lambda item: {k: v.strip() if isinstance(v, str) else v for k, v in item.items()},
    description="Trim whitespace from all string values"
)

# Rule 2: Convert 'AGE' to integer, handle None
transformer.add_rule(
    lambda item: {**item, "AGE": int(item["AGE"]) if item.get("AGE") is not None else None},
    description="Convert 'AGE' to integer, preserving None"
)

# Rule 3: Normalize 'EMAIL' to lowercase
transformer.add_rule(
    lambda item: {**item, "EMAIL": item["EMAIL"].lower()},
    description="Convert 'EMAIL' to lowercase"
)

# Rule 4: Rename 'Name' to 'full_name' and 'AGE' to 'age_years'
transformer.add_rule(
    lambda item: {
        "full_name": item.pop("Name"),
        "age_years": item.pop("AGE"),
        **item
    },
    description="Rename 'Name' to 'full_name' and 'AGE' to 'age_years'"
)


transformed_data = transformer.transform(raw_data)

print("\nTransformed Data:")
for item in transformed_data:
    print(item)

# You can also inspect the transformation history if needed
print("\nTransformation History:")
for item in transformer.get_history():
    print(f"- {item['description']}")

Enter fullscreen mode Exit fullscreen mode

Advanced Usage

This is where Saadati-Connect really starts to shine, letting you build more complex and resilient data pipelines.

Chaining Operations

One of my favorite aspects of well-designed libraries is their composability. Saadati-Connect lets you chain operations, making your data flow incredibly readable and maintainable.


python
# chained_example.py
from saadati_connect.connectors import SQLiteConnector, RESTConnector
from saadati_connect.models import ConnectionConfig
from saadati_connect.transformers import DataTransformer
import asyncio
import sqlite3

# 1. Setup a dummy SQLite DB
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("CREATE TABLE posts (id INTEGER PRIMARY KEY, title TEXT, body TEXT, user_id INTEGER)")
conn.commit()
conn.close()

async def run_chained_flow():
    # Configure connectors
    db_config = ConnectionConfig(type="sqlite", uri="sqlite:///:memory:")
    sqlite_connector = SQLiteConnector(db_config)

    api_config = ConnectionConfig(type="rest", uri="https://jsonplaceholder.typicode.com")
    rest_connector = RESTConnector(api_config)

    # Configure transformer
    transformer = DataTransformer()
    transformer.add_rule(lambda item: {
        "id": item.get("id"),
        "title": item.get("title"),
        "body": item.get("body"),
        "user_id": item.get("userId") # Rename 'userId' to 'user_id' for DB
    })

    print("Starting data ingestion flow...")
    try:
        # Step 1: Fetch data from REST API
        print("Fetching posts from API...")
        api_posts = await rest_connector.get("/posts", params={"_limit": 10})

        # Step 2: Transform the fetched data
        print(f"Transforming {len(api_posts)} posts...")
        transformed_posts = transformer.transform(api_posts)

        # Step 3: Insert transformed data into SQLite
        print("Inserting transformed posts into SQLite database...")
        for post in transformed_posts:
            # We explicitly handle potential ID conflicts or just let the DB auto-increment if ID is None
            sqlite_connector.execute_query(
                "INSERT OR REPLACE INTO posts (id, title, body, user_id) VALUES (?, ?, ?, ?)",
                params=(post['id'], post['title'], post['body'], post['user_id'])
            )
        print("Data ingestion complete.")

        # Step 4: Verify data in SQLite
        db_posts = sqlite_connector.fetch_data("SELECT id, title, user_id FROM posts ORDER BY id LIMIT 5")
        print("\nPosts currently in SQLite (first 5):")
        for post in db_posts:
            print(f"  ID: {post['id']}, Title: {post['title'][:
Enter fullscreen mode Exit fullscreen mode

Top comments (0)