datalinks package
Submodules
datalinks.api module
class datalinks.api.DLConfig(host, apikey, index, namespace, objectname)
Bases:object
DLConfig class is a configuration container for managing the required settings
to interact with DataLinks. It loads configuration values from environment
variables to provide flexibility across different environments.
This class is designed to simplify the initialization and storage of connection
and namespace details required to communicate with DataLinks.
- Variables:
- host – The host URL for the data layer connection.
- apikey – The API key for authentication with the data layer.
- index – The index name to be used in the data layer operations.
- namespace – The namespace for organizing data in the data layer.
- objectname – The name of the object associated with the configuration. Defaults to an empty string.
host : str
apikey : str
index : str
namespace : str
objectname : str
classmethod from_env(load_dotenv=True)
class datalinks.api.AskEvent(type, data)
Bases:object
Represents a single SSE event from the /query/ask streaming endpoint.
- Variables:
- type – Event type — one of
plan,step,answer, orerror. - data – Parsed JSON payload for the event.
- type – Event type — one of
type : str
data : Dict[str, Any]
class datalinks.api.IngestionResult(successful, failed)
Bases:object
Represents the result of a data ingestion process into DataLinks.
This class is a data structure used to store the results of a data ingestion
operation. It separates the successfully ingested items from the failed ones,
enabling users to track and handle both cases effectively.
- Variables:
- successful – A list of records successfully ingested. Each record is represented as a dictionary.
- failed – A list of records that failed ingestion. Each record is represented as a dictionary.
successful : List[Dict[str, Any]]
failed : List[Dict[str, Any]]
exception datalinks.api.DataLinksRequestError(endpoint, e)
Bases:Exception
class datalinks.api.DataLinksAPI(config=None)
Bases:object
Class for interfacing with the DataLinks API.
Provides methods for ingesting data, managing namespaces, and querying data
from DataLinks. Designed to interact with a configurable
backend, providing flexibility for deployment environments.
- Variables: config – Configuration object containing API key, host, index, namespace, and object name.
config : DLConfig
ingest(data, inference_steps=None, entity_resolution=None, batch_size=0, max_attempts=3)
Ingests data into the namespace by batching the given data and performing multiple retries in case of failures. This function sends data in chunks (batches), to be processed through configured inference steps, and to resolve entities based on the provided configuration. If a batch fails, it is retried up to a maximum number of attempts.- Parameters:
- data (
List[Dict[str,Any]]) – List of dictionaries, where each dictionary represents a data block to be ingested. - inference_steps (
Pipeline|None) – Pipeline of inference steps to be applied for processing the data. If None the data will be ingested as is. - entity_resolution (
MatchTypeConfig|None) – Configuration specifying how entity resolution is to be performed. - batch_size – Number of data blocks to be included in each batch. Defaults to the size of the entire dataset if not provided.
- max_attempts – Maximum number of retry attempts for failed batches. Defaults to the provided constant MAX_INGEST_ATTEMPTS.
- data (
- Return type:
IngestionResult - Returns: An IngestionResult object containing lists of successfully ingested data blocks and data blocks that failed to be ingested.
create_space(is_private=True, data_description=”, field_definitions=”)
Creates a new space with the specified privacy settings. This function sends a POST request to create a namespace with the given privacy status. Information about the namespace creation will be logged, including the HTTP status code and response reason. If the namespace already exists, a warning will be logged.- Parameters: is_private (bool) – Determines whether the created namespace will be private or public.
- Return type:
None - Returns: None
- Raises: HTTPError – If the HTTP request fails due to connectivity issues or server-side problems.
update_infer_definition(data_description, field_definition)
Update the saved inference definition for the configured dataset. The inference definition is used automatically on future ingest calls to guide field extraction and normalization.- Parameters:
- data_description (str) – Free-text description of the dataset (max 10,000 chars).
- field_definition (str) – Field-level definitions, one per line as
field=description(max 10,000 chars).
- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
infer_dataset_description(sample, model=None, provider=None, current_description=None, current_schema=None)
Ask an agent to infer a data description and field schema from sampled data.- Parameters:
- sample (Dataset) – A sample of data rows to analyse.
- model (Optional *[*str ]) – LLM model name.
- provider (Optional *[*str ]) – LLM provider (e.g.
"openai","ollama"). - current_description (Optional *[*str ]) – Existing description to refine.
- current_schema (Optional *[*Dict *[*str , str ] ]) – Existing field schema to refine (field → description mapping).
- Returns:
Inferred
dataDescriptionandfieldDefinition. - Return type: Dict
- Raises: DataLinksRequestError – If the HTTP request fails.
update_sort_order(order)
Update the display order of columns for the configured dataset.- Parameters: order (List *[*str ]) – Ordered list of all column names in the desired sequence.
- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
prepare_multipart_upload(filename, size)
Initiate a multipart upload and receive presigned URLs for each part. Use this for large files. Upload each part directly to its presigned URL, then callfinish_multipart_upload() with the returned ETags.
- Parameters:
- filename (str) – Name of the file being uploaded.
- size (int) – File size in bytes.
- Returns:
Response containing
uploadId,key, and presigned part URLs. - Return type: Dict
- Raises: DataLinksRequestError – If the HTTP request fails.
finish_multipart_upload(upload_id, key, parts, name=None)
Complete a multipart upload after all parts have been uploaded.- Parameters:
- upload_id (str) – Upload ID from
prepare_multipart_upload(). - key (str) – S3 object key from
prepare_multipart_upload(). - parts (List *[*Dict *[*str , Any ] ]) – List of completed parts, each with
partNumber(int) andetag(str) returned by S3. - name (Optional *[*str ]) – Optional label for the ingestion (e.g. original filename).
- upload_id (str) – Upload ID from
- Returns: Ingestion result from the server.
- Return type: Dict
- Raises: DataLinksRequestError – If the HTTP request fails.
abort_multipart_upload(upload_id, key)
Abort a multipart upload and clean up partial data.- Parameters:
- upload_id (str) – Upload ID from
prepare_multipart_upload(). - key (str) – S3 object key from
prepare_multipart_upload().
- upload_id (str) – Upload ID from
- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
list_ingestions(page_size=25)
List ingestion attempts for the configured dataset, most recent first. Each record containsid, status, statusMessage,
processedBytes, expectedTotalBytes, processedRows,
and attributes.
- Parameters: page_size (int) – Number of records to return (1-100, default 25).
- Returns: List of ingestion attempt dicts, or None on failure.
- Return type: Optional[List[Dict]]
wait_for_ingestion(ingestion_id, poll_interval=5, timeout=1200)
Poll until the given ingestion reaches a terminal status. Pollslist_ingestions() every poll_interval seconds until the
ingestion with ingestion_id is no longer in a pending/processing
state, or until timeout seconds have elapsed.
- Parameters:
- ingestion_id (str) – Ingestion ID returned by
finish_multipart_upload(). - poll_interval (int) – Seconds between polls (default 5).
- timeout (int) – Maximum seconds to wait before raising (default 600).
- ingestion_id (str) – Ingestion ID returned by
- Returns: The final ingestion record dict.
- Return type: Dict
- Raises:
- TimeoutError – If timeout is exceeded before a terminal status.
- DataLinksRequestError – If polling requests fail.
get_dataset_info()
Retrieve metadata for the configured dataset. Returns a dict withdataset, metadata, and inferDefinition keys,
or None if the request fails.
- Returns: Dataset metadata dict, or None on failure.
- Return type: Optional[Dict]
delete_dataset()
Permanently delete the configured dataset, including all data, links, and metadata. This action is irreversible (balefire).- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
rename_dataset(new_name)
Rename the configured dataset.- Parameters: new_name (str) – The new dataset name.
- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
clear_dataset()
Remove all data and links from the configured dataset. The dataset itself (metadata, schema) is preserved. This action is irreversible.- Raises: DataLinksRequestError – If the HTTP request fails.
- Return type:
None
add_link(from_namespace, from_dataset, from_column, to_namespace, to_dataset, to_column, match_type, options=None)
Create a manual link between two dataset columns.- Parameters:
- from_namespace (
str) – Source namespace. - from_dataset (
str) – Source dataset name. - from_column (
str) – Source column name. - to_namespace (
str) – Target namespace. - to_dataset (
str) – Target dataset name. - to_column (
str) – Target column name. - match_type (
str) – Match type —"ExactMatch"or"GeoMatch". - options (
Optional[Dict[str,Any]]) – Optional match configuration (e.g.minDistinct,distance).
- from_namespace (
- Returns: The created link object, or None on failure.
- Return type: Optional[Dict]
- Raises: DataLinksRequestError – If the HTTP request fails.
preview_links(data, entity_resolution=None)
Preview what recalculating links would produce without saving changes.- Parameters:
- data (Dataset) – Array of ontology data objects (e.g. from
query_data()). - entity_resolution (Optional [MatchTypeConfig ]) – Optional link matching configuration.
- data (Dataset) – Array of ontology data objects (e.g. from
- Returns: Preview of link objects, or None on failure.
- Return type: Optional[List[Dict]]
- Raises: DataLinksRequestError – If the HTTP request fails.
rebuild_links(data, entity_resolution=None)
Recalculate links for the configured dataset based on current data.- Parameters:
- data (Dataset) – Array of ontology data objects (e.g. from
query_data()). - entity_resolution (Optional [MatchTypeConfig ]) – Optional link matching configuration.
- data (Dataset) – Array of ontology data objects (e.g. from
- Returns: Updated list of link objects, or None on failure.
- Return type: Optional[List[Dict]]
- Raises: DataLinksRequestError – If the HTTP request fails.
load_links()
Retrieve active and suggested links for the configured dataset.- Returns: A list of link objects, or None on failure.
- Return type: Optional[List[Dict]]
list_datasets(namespace=None)
Retrieves the list of datasets for the user, optionally filtered by a specific namespace.- Parameters: namespace (Optional *[*AnyStr ]) – Optional namespace to filter the datasets by. If provided, only datasets associated with the given namespace will be returned. If not provided, all datasets are retrieved.
- Returns: A list of datasets represented as dictionaries if the query is successful and returns a status code of 200, or None if the query fails or encounters an error.
- Return type: List[Dict] | None
query_data(query=None, is_natural_language=False, model=None, provider=None, include_metadata=False)
Queries data from a specified data source and processes the response. The method allows querying with a specific query string or with a wildcard (“*”) for all data. The response from the query can be filtered to exclude metadata fields if include_metadata is set to False. Metadata fields are identified by key names starting with an underscore.- Parameters:
- query (str) – The query string to use for fetching data. Defaults to “*”, which retrieves all data.
- model (str) – The model name to use for inference.
- provider (str) – The provider of the LLM model (ollama, openai, etc)
- include_metadata (bool) – Specifies whether to include metadata fields in the returned data. Defaults to False.
- Returns: A list of records represented as dictionaries, or None if the query fails or an exception occurs during the request.
- Return type: List[Dict] | None
- Raises: requests.exceptions.RequestException – If a request-related error occurs during querying.
ask(query, model=None, provider=None, helper_prompt=None)
Talk to your data with natural language using the DataLinks AutoRAG agent. Streams the agent’s reasoning and final answer as Server-Sent Events. Events are yielded in order: oneplan event, one or more step events, then either an
answer event or an error event.
- Parameters:
- query (str) – The natural language question to answer.
- model (str) – The model name to use for inference.
- provider (str) – The LLM provider (e.g.
openai,ollama). - helper_prompt (str) – Optional custom system prompt.
- Returns:
An iterator of
AskEventobjects. - Return type: Iterator[AskEvent]
- Raises: DataLinksRequestError – If the HTTP request fails.
datalinks.cli module
class datalinks.cli.StandardCLI(name=‘datalinks-client’, description=‘Infer and link your data!’)
Bases:object
Command-Line Interface (CLI) wrapper for customizable argument parsing.
Simplifies the creation and usage of the DataLinks CLI by allowing to
pass a custom callback function for additional arguments specific to an
application. It provides a standard set of CLI arguments while enabling
customization through user-defined groups.
- Variables:
- name – The name of the CLI program.
- description – The description of the CLI program.
datalinks.cli.get_default_args()
- Return type:
Namespace
datalinks.links module
class datalinks.links.MatchType(targetColumns=None)
Bases:object
The base type for entity resolution operators.
- Variables: targetColumns – A list of column names that are the target for matching. If None, all columns are used for entity resolution.
targetColumns : List[str] | None = None
class datalinks.links.ExactMatch(targetColumns=None, minVariation=None, minDistinct=None)
Bases:MatchType
Use this match type to evaluate and configure specific exact matching criteria
for the data values.
- Variables:
- minVariation – Minimum allowable variation in the field to check for matches (defaults to 0.0).
- minDistinct – Minimum percentage of distinct values in the field to check for matches (defaults to 0.0).
minVariation : float | None = None
minDistinct : float | None = None
class datalinks.links.GeoMatch(targetColumns=None, distance=None, distanceUnit=None)
Bases:MatchType
use this match type to check for matches in fields that represent geographical
attributes.
- Variables:
- distance – The maximum distance value for the geographical match (defaults to 2.0)
- distanceUnit – The unit of measurement for the distance, such as kilometers or miles (defaults to ‘kilometers’).
distance : float | None = None
distanceUnit : str | None = None
class datalinks.links.EntityResolutionTypes(value)
Bases:StrEnum
Enumerates the various resolution strategies for handling
matching or reconciliation of entity data. Each enumeration value
specifies a particular method or approach used for determining
entity equivalence or correspondence.
- Variables:
- ExactMatch – Used when entities are determined to be equivalent based on exact value matches without any approximation.
- GeoMatch – Used when entities are matched based on their geographical location or proximity.
ExactMatch = ‘ExactMatch’
GeoMatch = ‘GeoMatch’
class datalinks.links.MatchTypeConfig(exact_match=None, geo_match=None)
Bases:object
Encapsulates configuration related to different types of entity resolution matches.
This class is designed to store, manage, and provide access to various entity resolution
match type configurations, such as ExactMatch and GeoMatch. It maintains internal
state for these match types and also provides access to a consolidated configuration
in dictionary format.
- Variables: matchTypes – A dictionary mapping entity resolution types to their respective match configurations (e.g., ExactMatch, GeoMatch).
matchTypes : dict[EntityResolutionTypes, MatchType | None]
property config : Dict[str, Dict] | None
datalinks.loaders module
class datalinks.loaders.Loader(folder)
Bases:ABC
Abstract base class for loading resources from a specified folder.
It serves as a template for loading files or other resources
while maintaining consistency across different implementations.
- Variables: folder – Path to the folder from which resources will be loaded.
abstractmethod load_from_folder()
class datalinks.loaders.JSONLoader(folder)
Bases:Loader
A loader for processing JSON files in a specified folder.
Iterates through all .json files within a given folder,
parses their content, and processes each JSON object into
a standardized format using the load_item method.
- Variables: folder – Path to the folder containing JSON files. All .json files in this folder will be processed.
load_from_folder()
- Return type:
List[Dict[str,str]]
abstractmethod load_item(row)
- Return type:
Dict[str,str]
datalinks.pipeline module
class datalinks.pipeline.StepTypes(value)
Bases:Enum
Represents different types of processing steps for data manipulation.
This class enumerates various distinct processing types that can be
used in DataLinks workflows. Each enumeration value signifies a specific
stage in the broader data-processing pipeline.
TABLE = ‘table’
ROWS = ‘rows’
NORMALIZE = ‘normalise’
VALIDATE = ‘validate’
REVERSE_GEO = ‘reverseGeo’
class datalinks.pipeline.NormalizeModes(value)
Bases:Enum
Enumeration for normalization modes.
This class represents different modes of data normalization
used in the ‘normalize’ step. It provides three options
for normalization: ‘embeddings’ for embedding-level normalization,
‘all-in-one’ for holistic normalization, and ‘field-by-field’
for column-wise normalization.
- Variables:
- EMBEDDINGS – Mode for normalizing data on an embedding level.
- ALL_IN_ONE – Mode for normalizing data holistically, treating the entire dataset as a single entity.
- FIELD_BY_FIELD – Mode for normalizing data column-by-column, focusing on individual fields independently.
EMBEDDINGS = ‘embeddings’
ALL_IN_ONE = ‘all-in-one’
FIELD_BY_FIELD = ‘field-by-field’
class datalinks.pipeline.ValidateModes(value)
Bases:Enum
Enumeration class that defines various validation modes.
This class is designed to specify the modes of operation for the ‘validate’
step. The predefined modes include validation by rows, regular
expressions, and fields.
- Variables:
- ROWS – Validation mode that focuses on rows.
- REGEX – Validation mode that utilizes regular expressions.
- FIELDS – Validation mode that focuses on columns.
ROWS = ‘rows’
REGEX = ‘regex’
FIELDS = ‘fields’
class datalinks.pipeline.BaseStep
Bases:object
Represents the base step within DataLinks.
This class serves as the foundational step structure for various
implementations. It includes methods to transform its data
representation into a dictionary format, custom-processed with specific
rules for attributes of Enum type. It is primarily designed as a metaclass.
- Variables: step_type – The type of the step, categorized using StepTypes.
step_type : ClassVar[StepTypes]
to_dict()
- Return type:
dict
class datalinks.pipeline.LlmStep(model, provider)
Bases:BaseStep
Common class for pipeline steps that rely on LLM inference.
- Variables:
- model – The name of the model to use in the step.
- provider – The identifier of the provider to be used (ollama, openai, etc)
model : str | None
provider : str | None
class datalinks.pipeline.InferenceStep(derive_from)
Bases:BaseStep
Represents the ‘infer’ step in the DataLinks workflow.
- Variables: derive_from – The identifier of the source field used in the inference step.
derive_from : str
class datalinks.pipeline.ProcessUnstructured(derive_from, model, provider, helper_prompt=”)
Bases:LlmStep, InferenceStep
Use this step to infer a table from unstructured data.
- Variables: helper_prompt – A string that stores an optional helper prompt or additional guiding context specific to the table inference step.
step_type : ClassVar[StepTypes] = ‘table’
helper_prompt : str = ”
class datalinks.pipeline.ProcessStructured(derive_from)
Bases:InferenceStep
Use this step to extract data that is already in tabular format (eg.: CSV).
step_type : ClassVar[StepTypes] = ‘rows’
class datalinks.pipeline.ReverseGeo(derive_from)
Bases:InferenceStep
Use this step to perform reverse geolocation based on the source field.
step_type : ClassVar[StepTypes] = ‘reverseGeo’
class datalinks.pipeline.Normalize(model, provider, target_cols, mode, helper_prompt=”)
Bases:LlmStep
Use this step to attempt normalisation of the extracted column names. Table
inference across different unstructured data blocks may result in different field names
for the same information, hence the need to normalize the column names.
Encapsulates the configuration necessary to perform the ‘normalize’ step.
It specifies the desired target columns, the mode of normalisation, and includes optional
helper prompts to provide further instructions or context.
- Variables:
- target_cols – A mapping of the desired column names to an optional description used as context.
- mode – Specifies the normalisation mode to be applied.
- helper_prompt – Optional helper text or prompt information.
step_type : ClassVar[StepTypes] = ‘normalise’
target_cols : Mapping[str, str | None]
mode : NormalizeModes
helper_prompt : str = ”
class datalinks.pipeline.Validate(model, provider, mode, columns)
Bases:LlmStep
Use this step to add data validation to the inference pipeline.
- Variables:
- mode – Indicates the mode of validation to be applied.
- columns – List containing the column names which are used for validation.
step_type : ClassVar[StepTypes] = ‘validate’
mode : ValidateModes
columns : List[str]
class datalinks.pipeline.Pipeline(*steps)
Bases:object
Represents a collection of sequential steps. Holds and manages the
sequence of steps used for ingesting and/or enhancing data.
- Variables: steps – A collection of steps to be executed in sequence.
to_list()
- Return type:
list[dict[str,Any]]