import logging
from pprint import pformat
import datalinks
from datalinks.api import DLConfig
from datalinks.links import MatchTypeConfig, ExactMatch
from datalinks.pipeline import Pipeline, ProcessUnstructured, Normalize, NormalizeModes, Validate, ValidateModes
def main():
logging.basicConfig(level=logging.DEBUG)
dl_config = DLConfig.from_env()
# we did not set namespace and object because it varies with each example
dl_config.namespace = "cinema"
dl_config.objectname = "awards"
# OR
# dl_config = DLConfig(
# host="http://localhost:9001",
# apikey="", # your DataLinks API key
# index="tests",
# namespace="pg",
# objectname="products"
# )
dlapi = datalinks.api.DataLinksAPI(dl_config)
dlapi.create_space(is_private=True) # default
textfile = "data/movies.txt"
logging.info(f"Loading text in {textfile}")
with open(textfile) as f:
data = {"text": f.read()}
steps = Pipeline(
ProcessUnstructured(
derive_from="text",
helper_prompt="If you find a numeric field use only the value and omit the rest.",
model="gpt-4.1-mini-2025-04-14",
provider="openai"
), # Infer table from unstructured text
Normalize(
target_cols={
"Name": "the actor/actress name",
"Titles": "the list of notable films where the actor was in",
"Oscars": "the number of oscars won"
},
mode=NormalizeModes.ALL_IN_ONE,
model="gpt-4.1-mini-2025-04-14",
provider="openai"
),
Validate(
mode=ValidateModes.ROWS,
columns=["Name", "Titles", "Oscars"],
model="gpt-4.1-mini-2025-04-14",
provider="openai"
)
)
entity_resolution = MatchTypeConfig(ExactMatch())
result = dlapi.ingest(
data = [data], # supports multiple files
inference_steps=steps,
entity_resolution=entity_resolution,
max_attempts=1,
batch_size=0 # default (no file batching)
)
logging.info(f"Ingestion result:"
f"\nSuccessfully ingested {len(result.successful)} dataset(s)."
f"\nFailed {len(result.failed)} dataset(s).")
data = dlapi.query_data(
model="gpt-4.1-mini-2025-04-14",
provider="openai",
include_metadata=True
)
logging.info(f"Ingested data:\n"
f"{pformat(data)}")
if __name__ == '__main__':
main()