Loading...
Loading...
Transform raw data into analytical assets using ETL/ELT patterns, SQL (dbt), Python (pandas/polars/PySpark), and orchestration (Airflow). Use when building data pipelines, implementing incremental models, migrating from pandas to polars, or orchestrating multi-step transformations with testing and quality checks.
npx skill4agent add neversight/skills_feed transforming-data{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transformreferences/etl-vs-elt-patterns.mdreferences/dataframe-comparison.mddbt_assetsreferences/orchestration-patterns.mdmodels/staging/models/intermediate/models/marts/models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0references/dbt-best-practices.mdreferences/incremental-strategies.mdimport pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)scan_csv()read_csv()with_columns()assign()pl.col()collect()from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)references/dataframe-comparison.mdfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # Define dependencyA >> B >> CA >> [B, C, D][A, B, C] >> Dreferences/orchestration-patterns.md-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)references/data-quality-testing.mdselect
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_salesreferences/window-functions-guide.mdmergeunique_key{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raisepip install dbt-core dbt-snowflakepip install polarspip install apache-airflowexamples/python/pandas-basics.pyexamples/python/polars-migration.pyexamples/python/pyspark-transformations.pyexamples/python/airflow-data-pipeline.pyexamples/sql/dbt-staging-model.sqlexamples/sql/dbt-intermediate-model.sqlexamples/sql/dbt-incremental-model.sqlexamples/sql/window-functions.sqlscripts/generate_dbt_models.pyscripts/benchmark_dataframes.pyingesting-datavisualizing-datadatabases-*streaming-dataai-data-engineeringobservability