Skip to main content
The following examples demonstrate end-to-end usage of the DataLinks Python SDK across a range of common scenarios. Each example is self-contained and can be run directly after configuring your environment variables.

Direct Ingestion

Demonstrates the simplest possible ingestion flow: load pre-structured rows from a JSON file and push them to DataLinks without any inference pipeline or entity resolution. Use this pattern when your data is already in a clean, tabular format and requires no AI-assisted transformation. Components covered: DLConfig, DataLinksAPI, create_space, ingest, query_data
"""
Direct ingestion example using data/pgproducts.json.

Ingests pre-structured rows directly without any inference pipeline or
entity resolution — the data is stored as-is.
"""
import json
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig


def main():
    logging.basicConfig(level=logging.INFO)

    dl_config = DLConfig.from_env()
    dl_config.namespace = "pg"
    dl_config.objectname = "products_direct"

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True)

    jsonfile = "data/pgproducts.json"
    logging.info(f"Loading {jsonfile}")
    with open(jsonfile) as f:
        rows = json.load(f)["rows"]

    logging.info(f"Ingesting {len(rows)} rows")
    result = dlapi.ingest(data=rows)

    logging.info(
        f"Ingestion result: {len(result.successful)} succeeded, "
        f"{len(result.failed)} failed"
    )

    data = dlapi.query_data()
    logging.info(f"Ingested data:\n{pformat(data)}")


if __name__ == "__main__":
    main()

JSON Ingestion with Pipeline and Entity Resolution

Demonstrates how to ingest structured JSON data using a ProcessStructured pipeline step and ExactMatch entity resolution. The pipeline instructs DataLinks to derive tabular rows from the JSON "rows" key, and entity resolution deduplicates records by exact field matching. Components covered: DLConfig, DataLinksAPI, Pipeline, ProcessStructured, MatchTypeConfig, ExactMatch, ingest, query_data
import json
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig
from datalinks.links import EntityResolutionTypes, MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessStructured


def main():
    logging.basicConfig(level=logging.INFO)

    dl_config = DLConfig.from_env()
    # we did not set namespace and object because it varies with each example
    dl_config.namespace = "pg"
    dl_config.objectname = "products"

    # OR
    #dl_config = DLConfig(
    #    host="http://localhost:9001",
    #    apikey="", # your DataLinks API key
    #    index="tests",
    #    namespace="pg",
    #    objectname="products"
    #)

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True) # default

    jsonfile = "data/pgproducts.json"
    logging.info(f"Loading json data in {jsonfile}")
    with open(jsonfile) as f:
        data = json.load(f)

    steps = Pipeline(
        ProcessStructured(derive_from="rows") # Data is already tabular
    )

    entity_resolution = MatchTypeConfig(ExactMatch())

    result = dlapi.ingest(
        data = [data], # supports multiple files
        inference_steps=steps,
        entity_resolution=entity_resolution,
        batch_size=0 # default (no file batching)
    )

    logging.info(f"Ingestion result:"
                 f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
                 f"\nFailed {len(result.failed)} dataset(s).")

    data = dlapi.query_data(
            model="gpt-4.1-nano-2025-04-14",
            provider="openai"
    )
    logging.info(f"Ingested data:"
                 f"{pformat(data)}")

if __name__ == '__main__':
    main()

Tabular Inference from Unstructured Text

Demonstrates a full AI-powered inference pipeline that transforms raw unstructured text into a structured table. The three-step pipeline uses ProcessUnstructured to extract an initial table from free-form text, Normalize to map columns to a target schema, and Validate to verify row integrity — all powered by an LLM. Components covered: DLConfig, DataLinksAPI, Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes, MatchTypeConfig, ExactMatch, ingest, query_data
import logging
from pprint import pformat

import datalinks
from datalinks.api import DLConfig
from datalinks.links import MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes


def main():
    logging.basicConfig(level=logging.DEBUG)

    dl_config = DLConfig.from_env()
    # we did not set namespace and object because it varies with each example
    dl_config.namespace = "cinema"
    dl_config.objectname = "awards"

    # OR
    # dl_config = DLConfig(
    #    host="http://localhost:9001",
    #    apikey="", # your DataLinks API key
    #    index="tests",
    #    namespace="pg",
    #    objectname="products"
    # )

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True) # default

    textfile = "data/movies.txt"
    logging.info(f"Loading text in {textfile}")
    with open(textfile) as f:
        data = {"text": f.read()}

    steps = Pipeline(
        ProcessUnstructured(
            derive_from="text",
            helper_prompt="If you find a numeric field use only the value and omit the rest.",
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        ), # Infer table from unstructured text
        Normalize(
            target_cols={
                "Name": "the actor/actress name",
                "Titles": "the list of notable films where the actor was in",
                "Oscars": "the number of oscars won"
            },
            mode=NormalizeModes.ALL_IN_ONE,
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        ),
        Validate(
            mode=ValidateModes.ROWS,
            columns=["Name", "Titles", "Oscars"],
            model="gpt-4.1-mini-2025-04-14",
            provider="openai"
        )
    )

    entity_resolution = MatchTypeConfig(ExactMatch())

    result = dlapi.ingest(
        data = [data], # supports multiple files
        inference_steps=steps,
        entity_resolution=entity_resolution,
        max_attempts=1,
        batch_size=0 # default (no file batching)
    )

    logging.info(f"Ingestion result:"
                 f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
                 f"\nFailed {len(result.failed)} dataset(s).")

    data = dlapi.query_data(
        model="gpt-4.1-mini-2025-04-14",
        provider="openai",
        include_metadata=True
    )
    logging.info(f"Ingested data:\n"
                 f"{pformat(data)}")

if __name__ == '__main__':
    main()

Multipart Upload

Demonstrates how to upload large files to DataLinks using the multipart upload API. The three-phase flow — prepare, upload, finish — streams the file in chunks directly to presigned S3 URLs, avoiding memory constraints for large datasets. If any part fails, the upload session is aborted to free server-side resources. Components covered: DLConfig, DataLinksAPI, prepare_multipart_upload, finish_multipart_upload, wait_for_ingestion, abort_multipart_upload
"""
Multipart upload example using data/pgproducts_mp.json (~10 MB, 2 parts).

Multipart upload is the recommended approach for large files. The flow is:
  1. Prepare  — DataLinks allocates an upload session and returns presigned S3 URLs
                and the server-side partSize to use when splitting the file.
  2. Upload   — Each file chunk (sized to partSize) is PUT directly to its presigned
                URL; S3 returns an ETag per part.
  3. Finish   — DataLinks assembles the parts and triggers ingestion.

If anything goes wrong during upload, abort is called to clean up the partial upload.
"""
import logging
import os

import requests

import datalinks
from datalinks.api import DLConfig


def upload_multipart(filepath: str):
    dl_config = DLConfig.from_env()
    dl_config.namespace = "pg_multipart"
    dl_config.objectname = "products"

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    dlapi.create_space(is_private=True)

    filename = os.path.basename(filepath)
    size = os.path.getsize(filepath)
    logging.info(f"Preparing multipart upload for '{filename}' ({size:,} bytes)")

    prepare = dlapi.prepare_multipart_upload(filename, size)
    upload_id = prepare["uploadId"]
    key = prepare["key"]
    part_size = prepare["partSize"]
    presigned_urls = [entry["url"] for entry in prepare["presignedUrls"]]
    logging.info(
        f"Upload session ready: {len(presigned_urls)} part(s) of {part_size:,} bytes, "
        f"uploadId={upload_id}"
    )

    parts = []
    part_num = 1
    try:
        with open(filepath, "rb") as f:
            for part_num, url in enumerate(presigned_urls, start=1):
                chunk = f.read(part_size)
                if not chunk:
                    break
                logging.info(
                    f"Uploading part {part_num}/{len(presigned_urls)} ({len(chunk):,} bytes)"
                )
                response = requests.put(url, data=chunk)
                response.raise_for_status()
                etag = response.headers["ETag"].strip('"')
                parts.append({"partNumber": part_num, "etag": etag})
                logging.info(f"Part {part_num} uploaded, ETag={etag}")
    except Exception as e:
        logging.error(f"Upload failed on part {part_num}: {e} — aborting")
        dlapi.abort_multipart_upload(upload_id, key)
        raise

    logging.info("All parts uploaded, finishing ingestion")
    result = dlapi.finish_multipart_upload(upload_id, key, parts, name=filename)
    ingestion_id = result["id"]
    logging.info(f"Ingestion queued: id={ingestion_id}")

    final = dlapi.wait_for_ingestion(ingestion_id)
    logging.info(
        f"Ingestion finished: status={final.get('status')!r}, "
        f"rows={final.get('processedRows')}, message={final.get('statusMessage')!r}"
    )


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    upload_multipart("data/pgproducts_mp.json")

Interactive Assistant

Demonstrates how to build an interactive command-line assistant that answers natural language questions about your data using the ask streaming API. The example handles each AskEvent type — plan, step, answer, and error — and renders responses using the rich library for a polished terminal experience. Components covered: DLConfig, DataLinksAPI, AskEvent, ask (streaming)
# NOTE:
# This example creates an interactive CLI to ask natural language questions about your data.

import datalinks
from datalinks.api import DLConfig, AskEvent
from rich.console import Console
from rich.markdown import Markdown
from rich.text import Text

console = Console()


def handle_event(event: AskEvent) -> None:
    if event.type == "plan":
        steps = event.data.get("steps", [])
        console.print()
        for step in steps:
            console.print(f"  {step}", style="dim")

    elif event.type == "step":
        idx = event.data.get("index", 0)
        reasoning = event.data.get("reasoning", "")
        query = event.data.get("query", "")
        data = event.data.get("data", [])
        console.print(f"  [{idx + 1}] {reasoning}", style="dim")
        if query:
            console.print(f"      query: {query}", style="dim italic")
        if data:
            console.print(f"      → {len(data)} record(s) retrieved", style="dim")

    elif event.type == "answer":
        response = event.data.get("response", "")
        console.print()
        console.print(Markdown(response))
        console.print()

    elif event.type == "error":
        message = event.data.get("message", "Unknown error")
        console.print(f"\nError: {message}\n", style="bold red")


def ask_loop(dlapi: datalinks.api.DataLinksAPI, namespace: str) -> None:
    console.print(Text.assemble(("DataLinks Assistant", "bold cyan"), " ", (f"({namespace})", "dim")))
    console.print("Type your question and press Enter. Type 'exit' to quit.\n", style="dim")

    while True:
        try:
            question = console.input("[bold yellow]You:[/bold yellow] ").strip()
        except (EOFError, KeyboardInterrupt):
            console.print()
            break

        if not question:
            continue
        if question.lower() == "exit":
            break

        console.print("[bold cyan]DataLinks:[/bold cyan]", end=" ")
        for event in dlapi.ask(question):
            handle_event(event)


def main() -> None:
    dl_config = DLConfig.from_env()

    if dl_config.namespace == "namespace-notset":
        try:
            dl_config.namespace = console.input("Namespace: ").strip()
        except (EOFError, KeyboardInterrupt):
            console.print()
            return

    dl_config.objectname = ""

    dlapi = datalinks.api.DataLinksAPI(dl_config)
    ask_loop(dlapi, dl_config.namespace)


if __name__ == "__main__":
    main()