us three weeks to ship a single knowledge pipeline. In the present day, an analyst with zero Python expertise does it in a day. Right here’s how we obtained there.
I’m Kiril Kazlou, a knowledge engineer at Mindbox. Our group repeatedly recalculates enterprise metrics for purchasers — which implies we’re continually constructing knowledge marts for billing and analytics, pulling from dozens of various sources.
For a very long time, we relied on PySpark for all our knowledge processing. The issue? You may’t actually work with PySpark with out Python expertise. Each new pipeline required a developer. And that meant ready — typically for weeks.
On this put up, I’ll stroll you thru how we constructed an inside knowledge platform the place an analyst or product supervisor can spin up a repeatedly up to date pipeline by writing simply 4 YAML recordsdata.
Why PySpark Was Slowing Us Down
Let me illustrate the ache with a textbook instance — calculating MAU (Month-to-month Lively Customers).
On the floor, this seems like a easy SQL job: COUNT(DISTINCT customerId) throughout just a few tables over a time window. However due to all of the infrastructure overhead — PySpark, Airflow DAG setup, Spark useful resource allocation, testing — we needed to hand it off to builders. The consequence? A full week simply to ship a MAU counter.
Each new metric took one to 3 weeks to ship. And each single time, the method regarded the identical:
- An analyst outlined the enterprise necessities, discovered an out there developer, and handed over the context.
- The developer clarified particulars, wrote PySpark code, went by code assessment, configured the DAG, and deployed.
What we truly needed was for analysts and product managers — the individuals who perceive the enterprise logic finest and are fluent in SQL and YAML — to deal with this themselves. No Python. No PySpark.

What We Changed PySpark With: YAML and SQL Are All You Want
To take a declarative method, we break up our knowledge layer into three components and picked the correct instrument for every:
- dlt (knowledge load instrument) — ingests knowledge from exterior APIs and databases into object storage. Configured completely by a YAML file. No code required.
- dbt (knowledge construct instrument) on Trino — transforms knowledge utilizing pure SQL. It hyperlinks fashions through
ref(), mechanically builds a dependency graph, and handles incremental updates. - Airflow + Cosmos — orchestrates the pipelines. The Airflow DAG is auto-generated from
dag.yamland the dbt venture.
We have been already utilizing Trino as a question engine for ad-hoc queries and had it plugged into Superset for BI. It had already confirmed itself: for queries with customary logic, it processed huge datasets sooner and with fewer sources than Spark. On prime of that, Trino natively helps federated entry to a number of knowledge shops from a single SQL question. For 90% of our pipelines, Trino was an ideal match.

How We Load Information: dlt.yaml
The primary YAML file describes the place and tips on how to load knowledge for downstream processing. Right here’s a real-world instance — loading billing knowledge from an inside API:
product: sg-team
function: billing
schema: billing_tarification
dag:
dag_id: dlt_billing_tarification
schedule: "0 4 * * *"
description: "Every day refresh of tarification knowledge"
tags:
- billing
alerts:
enabled: true
severity: warning
supply:
sort: rest_api
consumer:
base_url: "https://internal-api.instance.com"
auth:
sort: bearer
token: dlt-billing.token
sources:
- title: tarification_data
endpoint:
path: /tarificationData
technique: POST
json:
firstPeriod: "%- set months_back = var('months_back', 5) "
lastPeriod: " int -%"
pricingPlanLine: CurrentPlan
write_disposition: exchange
processing_steps:
- map: dlt_custom.billing_tarification_data.map
- title: charges_raw
columns:
staffUserName:
data_type: textual content
nullable: true
endpoint:
path: /data-feed/prices
technique: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: exchange
- title: discounts_raw
endpoint:
path: /data-feed/reductions
technique: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: exchange
This config defines 4 sources from a single API. For each, we specify the endpoint, request parameters, and a write technique — in our case, exchange means “overwrite each time.” It’s also possible to add processing steps, outline column sorts, and configure alerts.
The complete config is 40 traces of YAML. With out dlt, every connector can be a Python script dealing with requests, pagination, retries, serialization to Delta Desk format, and uploads to storage.
How We Rework Information With SQL: dbt_project.yaml and sources.yaml
The subsequent step is configuring the dbt mannequin. With Trino, which means SQL queries.
Right here’s an instance of how we arrange the MAU calculation. That is what occasion preparation from a single supply appears like:
-- int_mau_events_visits.sql (simplified)
{{ config(materialized='desk') }}
WITH interval AS (
-- Rolling window: final 5 months to present
SELECT
YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
occasions AS (
-- Pull go to occasions inside the interval window
SELECT src._tenant, src.unmergedCustomerId,
'visits' AS src_type, src.endpoint
FROM {{ supply('closing', 'customerstracking_visits') }} src
CROSS JOIN interval p
WHERE src.unmergedCustomerId IS NOT NULL
AND /* ...timestamp filtering by 12 months/month bounds... */
),
events_with_customer AS (
-- Resolve merged buyer IDs
SELECT e._tenant,
COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
e.src_type, e.endpoint
FROM occasions e
LEFT JOIN {{ ref('int_merged_customers') }} mc
ON e._tenant = mc._tenant
AND e.unmergedCustomerId = mc.unmergedCustomerId
)
-- Hold solely precise (non-deleted) prospects
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
SELECT 1 FROM {{ ref('int_actual_customers') }} ac
WHERE ewc._tenant = ac._tenant
AND ewc.customerId = ac.customerId
)
All 10 occasion sources comply with the very same sample. The one variations are the supply desk and the filters. Then the fashions merge right into a single stream:
-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 extra sources
And eventually, the information mart the place the whole lot will get aggregated:
-- mau_period_datamart.sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}
int -%
WITH interval AS (
SELECT
YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
SELECT
er._tenant,
COUNT(DISTINCT CASE WHEN src_type = 'visits'
THEN customerId END) AS CustomersTracking_Visits,
COUNT(DISTINCT CASE WHEN src_type = 'orders'
THEN customerId END) AS ProcessingOrders_Orders,
COUNT(DISTINCT CASE WHEN src_type = 'mailings'
THEN customerId END) AS Mailings_MessageStatuses,
-- ...different metrics
COUNT(DISTINCT customerId) AS MAU
FROM events_resolved er
GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN interval p
For the information mart configuration, we use incremental_strategy='merge'. dbt mechanically generates the merge question, substituting the unique_key for upsert. No have to manually implement incremental loading.
To tie the fashions right into a single venture, we arrange dbt_project.yaml:
title: mau_period
model: '1.0.0'
fashions:
mau_period:
+on_table_exists: exchange
+on_schema_change: append_new_columns
And sources.yaml, which describes the enter tables:
sources:
- title: closing
database: data_platform
schema: closing
tables:
- title: inapps_targetings_v2
- title: inapps_clicks_v2
- title: customerstracking_visits
- title: processingorders_orders
- title: cdp_mergedcustomers_v2
# ...
The consequence is similar enterprise logic we had in PySpark, however in pure SQL: sources.yaml replaces typedspark schemas, {{ ref() }} and {{ supply() }} exchange .get_table(), and computerized execution order through the dependency graph replaces guide Spark useful resource tuning.
How We Configure Airflow: dag.yaml
The fourth configuration file defines when and the way Airflow runs the pipeline:
product: sg-team
function: billing
schema: mau
schedule: "15 21 * * *" # each day at 00:15 MSK
params:
- title: start_date
description: "Begin date (YYYY-MM-DD). Go away empty for auto"
default: ""
- title: end_date
description: "Finish date (YYYY-MM-DD). Go away empty for auto"
default: ""
- title: months_back
description: "Months to look again (default: 5)"
default: 5
alerts:
enabled: true
severity: warning
Then our Python script parses dag.yaml and dbt_project.yaml and makes use of the Cosmos library to generate a totally purposeful Airflow DAG. That is the solely piece of Python code in your entire setup. It’s written as soon as and works for each dbt venture. Right here’s the important thing half:
def _build_dbt_project_dags(project_path: Path, environ: dict) -> record[DbtDag]:
config_dict = yaml.safe_load(dag_config_path.read_text())
config = DagConfig.model_validate(config_dict)
# YAML params → Airflow Params
params = {}
operator_vars = {}
for param in config.params:
params[param.name] = Param(
default=param.default if param.default shouldn't be None else "",
description=param.description,
)
operator_vars[param.name] = f"{{{{ params.{param.title} }}}}"
# Cosmos creates the DAG from the dbt venture
with DbtDag(
dag_id=f"dbt_{project_path.title}",
schedule=config.schedule,
params=params,
project_config=ProjectConfig(dbt_project_path=project_path),
profile_config=ProfileConfig(
profile_name="default",
target_name=project_name,
profile_mapping=TrinoLDAPProfileMapping(
conn_id="trino_default",
profile_args={
"database": profile_database,
"schema": profile_schema,
},
),
),
operator_args={"vars": operator_vars},
) as dag:
# Create schema earlier than operating fashions
create_schema = SQLExecuteQueryOperator(
task_id="create_schema",
conn_id="trino_default",
sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
)
# Connect to root duties
for unique_id, _ in dag.dbt_graph.filtered_nodes.gadgets():
job = dag.tasks_map[unique_id]
if not job.upstream_task_ids:
create_schema >> job
Cosmos reads manifest.json from the dbt venture, parses the mannequin dependency graph, and creates a separate Airflow job for every mannequin. Job dependencies are constructed mechanically primarily based on ref() calls within the SQL.
How Analysts Construct Pipelines With out Builders
Now when an analyst wants a brand new recurring pipeline, they will put it collectively in just a few steps:
Step 1. Create a folder within the repo: dbt-projects/my_new_pipeline/.
Step 2. If exterior knowledge ingestion is required, write a YAML config for dlt.
Step 3. Write SQL fashions within the fashions/ folder and describe the sources in sources.yaml.
Step 4. Create dbt_project.yaml and dag.yaml.
Step 5. Push to Git, undergo assessment, merge.
CI/CD builds the dbt venture and ships artifacts to S3. Airflow reads the DAG recordsdata from there, Cosmos parses the dbt venture and generates the duty graph. On schedule, dbt runs the fashions on Trino within the right order. The top result’s an up to date knowledge mart within the warehouse, accessible by Superset.
What Modified After the Migration

For analysts to construct pipelines on their very own, they should perceive ref() and supply() ideas, the distinction between desk and incremental materialization, and the fundamentals of Git. We ran just a few inside workshops and put collectively step-by-step guides for every job sort.
Why the New Stack Doesn’t Totally Exchange PySpark
For about 10% of our pipelines, PySpark remains to be the one choice — when a change merely doesn’t match into SQL. dbt helps Jinja macros, however that’s no substitute for full-blown Python. And it might be dishonest to skip over the restrictions of the brand new instruments.
dlt + Delta: experimental upsert help. We use the Delta format in our storage layer. dlt’s Delta connector is marked as experimental, so the merge technique didn’t work out of the field. We needed to discover workarounds — in some instances we used exchange as an alternative of merge (sacrificing incrementality), and in others we wrote customized processing_steps.
Trino’s restricted fault tolerance. Trino does have a fault tolerance mechanism, however it works by writing intermediate outcomes to S3. At our terabyte-scale knowledge volumes, that is impractical — the sheer variety of S3 operations makes it prohibitively costly. With out fault tolerance enabled, if a Trino employee goes down, your entire question fails. Spark, in contrast, restarts simply the failed job. We addressed this with DAG-level retries and by decomposing heavy fashions into chains of intermediate ones.
UDFs and customized logic. In Spark, you possibly can write customized logic in Python proper contained in the pipeline — tremendous handy. With the brand new structure, that is a lot tougher. dbt on prime of Trino doesn’t assist: Jinja solely generates SQL, and dbt’s Python fashions solely work with Snowflake, Databricks, and BigQuery. You may write UDFs in Trino, however solely in Java — with all of the overhead that entails: a separate repo, a construct pipeline, deploying JARs throughout all staff. So when a change doesn’t match into SQL, you both find yourself with an unmaintainable SQL monster or a standalone script that breaks the lineage.
What’s Subsequent: Exams, Mannequin Templates, and Coaching
Higher testing. We had strong pipeline testing in PySpark, however the brand new structure remains to be catching up. Current dbt variations launched unit testing — now you can validate SQL mannequin logic in opposition to mock knowledge with out spinning up the total pipeline. We wish to add dbt exams each on the mannequin stage and as a separate monitoring layer.
Reusable templates for frequent patterns. A lot of our dbt fashions look alike. A single config might describe a dozen fashions with the identical sample — solely the supply desk and filters differ. We plan to extract the shared logic into dbt macros.
Increasing the platform’s consumer base. We would like extra engineers and analysts to work with knowledge independently. We’re planning common inside coaching periods, documentation, and onboarding guides so new customers can stand up to hurry shortly and begin constructing their very own fashions.
In case your group is caught in the identical “analysts watch for builders” loop, I’d love to listen to the way you’re fixing it. Connect with me on LinkedIn and let’s evaluate notes.
All photos on this article are by the writer except in any other case famous.
