User Community Service Desk Downloads

Configure Dagster with OpenLineage

This guide shows you how to configure Dagster to send OpenLineage events to your Ataccama ONE orchestrator connection using the Ataccama OpenLineage sensor.

Before you begin, ensure you have generated an API key and copied your endpoint URL.

Prerequisites

Ataccama OpenLineage sensor file

The integration requires a private sensor implementation file (openlineage_extended.py) provided by Ataccama. This file is not publicly distributed and is available on request. Contact your Ataccama Customer Success Manager to obtain the sensor file before proceeding.

Compatibility

The sensor has been tested with the following versions:

Component Version

Dagster

1.6.x - 1.12.x

openlineage-python

1.38.0 or later

Python

3.10 - 3.12

Install the OpenLineage integration

pip install "openlineage-python>=1.38.0" attrs

Copy the provided openlineage_extended.py file to your Dagster project directory.

Configure the transport

The sensor can be configured using either environment variables or a configuration file.

Option 1: Environment variables

Set the following environment variables:

Variable Required Description

OPENLINEAGE_URL

Yes

Your Ataccama ONE endpoint URL

OPENLINEAGE_API_KEY

Yes

API key from your orchestrator connection

OPENLINEAGE_NAMESPACE

No

Namespace for organizing events (default: dagster)

DATABASE_URL

Yes

Database connection URL for dataset namespace extraction

DATABASE_SCHEMA

No

Override schema name extracted from DATABASE_URL (default: public)

Example:

export OPENLINEAGE_URL="https://<YOUR_INSTANCE>.ataccama.one/gateway/openlineage/<CONNECTION_ID>/"
export OPENLINEAGE_API_KEY="<YOUR_API_KEY>"
export OPENLINEAGE_NAMESPACE="production"
export DATABASE_URL="postgresql://user:password@host:5432/database"

Option 2: Configuration file

Create an openlineage.yml file in one of these locations:

  • Current working directory

  • $HOME/.openlineage/

  • Custom path specified by OPENLINEAGE_CONFIG environment variable

Example openlineage.yml:

transport:
  type: http
  url: https://<YOUR_INSTANCE>.ataccama.one/gateway/openlineage/<CONNECTION_ID>/
  endpoint: api/v1/lineage
  auth:
    type: api_key
    apiKey: <YOUR_API_KEY>
When using a configuration file, you still need to set DATABASE_URL as an environment variable for dataset namespace extraction.

Register the sensor

Add the sensor to your Dagster definitions:

from dagster import Definitions
from my_project.openlineage_extended import openlineage_sensor_with_datasets

openlineage_sensor = openlineage_sensor_with_datasets(
    name="openlineage_sensor",
    minimum_interval_seconds=30,
)

defs = Definitions(
    assets=[...],
    sensors=[openlineage_sensor],
)

Enable the sensor

After deploying your Dagster instance, enable the sensor using one of these methods:

Dagster UI

Navigate to Automation > Sensors > Start the openlineage_sensor.

CLI
dagster sensor start openlineage_sensor -w /path/to/workspace.yaml
The -w flag specifies the workspace file. Omit it if running from a directory with a workspace.yaml or pyproject.toml that defines the code location.

Verify the connection

  1. Enable the sensor in Dagster.

  2. Run a job or materialize one or more assets.

  3. Wait approximately 30 seconds for events to be emitted.

  4. Verify job executions appear in your ONE orchestrator connection.

Capture asset metadata (optional)

To include row counts and schema information in OpenLineage events, add metadata to your assets:

from dagster import asset, AssetExecutionContext, MetadataValue

@asset
def my_asset(context: AssetExecutionContext):
    df = process_data()

    context.add_output_metadata({
        "row_count": MetadataValue.int(len(df)),
        "column_types": MetadataValue.json({
            col: str(dtype) for col, dtype in df.dtypes.items()
        }),
    })

    return df

The following metadata keys are supported:

Metadata Key Description

row_count

Number of rows in output dataset

column_types

Dictionary of column names to data types

sql_queries

List of SQL queries executed

When materializing assets directly from the Assets page, Dagster uses an implicit job called __ASSET_JOB. To see meaningful job names in ONE, define explicit jobs:

from dagster import Definitions, define_asset_job

my_pipeline = define_asset_job(
    name="my_data_pipeline",
    selection="*",
)

defs = Definitions(
    assets=[...],
    jobs=[my_pipeline],
    sensors=[openlineage_sensor],
)

Run the job via Jobs > my_data_pipeline > Launchpad > Launch Run instead of the Assets page.

Troubleshooting

No events appearing

  • Verify the sensor is running (green status in Automation > Sensors).

  • Check that environment variables are set correctly.

  • Ensure assets have been materialized.

Authentication errors

  • Verify the API key is correct.

  • Ensure the endpoint URL is correct and accessible.

Missing datasets

  • Ensure DATABASE_URL environment variable is set.

  • Verify the URL format is correct for your database type.

Event log storage error

If you see an error mentioning run-sharded event log storage or RunShardedEventsCursor, your Dagster instance is using the default SQLite storage which doesn’t support the sensor’s cursor-based queries.

Configure consolidated event log storage by creating a dagster.yaml file:

event_log_storage:
  module: dagster._core.storage.event_log
  class: ConsolidatedSqliteEventLogStorage
  config:
    base_dir: .dagster_home

run_storage:
  module: dagster._core.storage.runs
  class: SqliteRunStorage
  config:
    base_dir: .dagster_home

schedule_storage:
  module: dagster._core.storage.schedules
  class: SqliteScheduleStorage
  config:
    base_dir: .dagster_home
This issue typically occurs in development environments. Production deployments using PostgreSQL storage are not affected.

Was this page useful?