Configure Dagster with OpenLineage
This guide shows you how to configure Dagster to send OpenLineage events to your Ataccama ONE orchestrator connection using the Ataccama OpenLineage sensor.
Before you begin, ensure you have generated an API key and copied your endpoint URL.
Prerequisites
- Ataccama OpenLineage sensor file
-
The integration requires a private sensor implementation file (
openlineage_extended.py) provided by Ataccama. This file is not publicly distributed and is available on request. Contact your Ataccama Customer Success Manager to obtain the sensor file before proceeding.
Compatibility
The sensor has been tested with the following versions:
| Component | Version |
|---|---|
Dagster |
1.6.x - 1.12.x |
openlineage-python |
1.38.0 or later |
Python |
3.10 - 3.12 |
Install the OpenLineage integration
pip install "openlineage-python>=1.38.0" attrs
Copy the provided openlineage_extended.py file to your Dagster project directory.
Configure the transport
The sensor can be configured using either environment variables or a configuration file.
Option 1: Environment variables
Set the following environment variables:
| Variable | Required | Description |
|---|---|---|
|
Yes |
Your Ataccama ONE endpoint URL |
|
Yes |
API key from your orchestrator connection |
|
No |
Namespace for organizing events (default: |
|
Yes |
Database connection URL for dataset namespace extraction |
|
No |
Override schema name extracted from DATABASE_URL (default: |
Example:
export OPENLINEAGE_URL="https://<YOUR_INSTANCE>.ataccama.one/gateway/openlineage/<CONNECTION_ID>/"
export OPENLINEAGE_API_KEY="<YOUR_API_KEY>"
export OPENLINEAGE_NAMESPACE="production"
export DATABASE_URL="postgresql://user:password@host:5432/database"
Option 2: Configuration file
Create an openlineage.yml file in one of these locations:
-
Current working directory
-
$HOME/.openlineage/ -
Custom path specified by
OPENLINEAGE_CONFIGenvironment variable
Example openlineage.yml:
transport:
type: http
url: https://<YOUR_INSTANCE>.ataccama.one/gateway/openlineage/<CONNECTION_ID>/
endpoint: api/v1/lineage
auth:
type: api_key
apiKey: <YOUR_API_KEY>
When using a configuration file, you still need to set DATABASE_URL as an environment variable for dataset namespace extraction.
|
Register the sensor
Add the sensor to your Dagster definitions:
from dagster import Definitions
from my_project.openlineage_extended import openlineage_sensor_with_datasets
openlineage_sensor = openlineage_sensor_with_datasets(
name="openlineage_sensor",
minimum_interval_seconds=30,
)
defs = Definitions(
assets=[...],
sensors=[openlineage_sensor],
)
Enable the sensor
After deploying your Dagster instance, enable the sensor using one of these methods:
- Dagster UI
-
Navigate to Automation > Sensors > Start the
openlineage_sensor. - CLI
dagster sensor start openlineage_sensor -w /path/to/workspace.yaml
The -w flag specifies the workspace file.
Omit it if running from a directory with a workspace.yaml or pyproject.toml that defines the code location.
|
Verify the connection
-
Enable the sensor in Dagster.
-
Run a job or materialize one or more assets.
-
Wait approximately 30 seconds for events to be emitted.
-
Verify job executions appear in your ONE orchestrator connection.
Capture asset metadata (optional)
To include row counts and schema information in OpenLineage events, add metadata to your assets:
from dagster import asset, AssetExecutionContext, MetadataValue
@asset
def my_asset(context: AssetExecutionContext):
df = process_data()
context.add_output_metadata({
"row_count": MetadataValue.int(len(df)),
"column_types": MetadataValue.json({
col: str(dtype) for col, dtype in df.dtypes.items()
}),
})
return df
The following metadata keys are supported:
| Metadata Key | Description |
|---|---|
|
Number of rows in output dataset |
|
Dictionary of column names to data types |
|
List of SQL queries executed |
Define explicit job names (recommended)
When materializing assets directly from the Assets page, Dagster uses an implicit job called __ASSET_JOB.
To see meaningful job names in ONE, define explicit jobs:
from dagster import Definitions, define_asset_job
my_pipeline = define_asset_job(
name="my_data_pipeline",
selection="*",
)
defs = Definitions(
assets=[...],
jobs=[my_pipeline],
sensors=[openlineage_sensor],
)
Run the job via Jobs > my_data_pipeline > Launchpad > Launch Run instead of the Assets page.
Troubleshooting
No events appearing
-
Verify the sensor is running (green status in Automation > Sensors).
-
Check that environment variables are set correctly.
-
Ensure assets have been materialized.
Authentication errors
-
Verify the API key is correct.
-
Ensure the endpoint URL is correct and accessible.
Missing datasets
-
Ensure
DATABASE_URLenvironment variable is set. -
Verify the URL format is correct for your database type.
Event log storage error
If you see an error mentioning run-sharded event log storage or RunShardedEventsCursor, your Dagster instance is using the default SQLite storage which doesn’t support the sensor’s cursor-based queries.
Configure consolidated event log storage by creating a dagster.yaml file:
event_log_storage:
module: dagster._core.storage.event_log
class: ConsolidatedSqliteEventLogStorage
config:
base_dir: .dagster_home
run_storage:
module: dagster._core.storage.runs
class: SqliteRunStorage
config:
base_dir: .dagster_home
schedule_storage:
module: dagster._core.storage.schedules
class: SqliteScheduleStorage
config:
base_dir: .dagster_home
| This issue typically occurs in development environments. Production deployments using PostgreSQL storage are not affected. |
Was this page useful?