Loading...
Loading...
Bronze/Silver/Gold layer design patterns and templates for building scalable data lakehouse architectures. Includes incremental processing, data quality checks, and optimization strategies.
npx skill4agent add vivekgana/databricks-platform-marketplace medallion-architecturedef ingest_to_bronze(source_path: str, target_table: str):
"""Ingest raw data to Bronze layer."""
df = (spark.read
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
)
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(target_table)
)def process_to_silver(bronze_table: str, silver_table: str):
"""Transform Bronze to Silver with quality checks."""
bronze_df = spark.read.table(bronze_table)
silver_df = (bronze_df
.dropDuplicates(["id"])
.filter(col("id").isNotNull())
.withColumn("email", lower(trim(col("email"))))
.withColumn("created_date", to_date(col("created_at")))
.withColumn("quality_score",
when(col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"), 1.0)
.otherwise(0.5)
)
)
(silver_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(silver_table)
)def aggregate_to_gold(silver_table: str, gold_table: str):
"""Aggregate Silver to Gold business metrics."""
silver_df = spark.read.table(silver_table)
gold_df = (silver_df
.groupBy("customer_segment", "region")
.agg(
count("*").alias("customer_count"),
sum("lifetime_value").alias("total_ltv"),
avg("quality_score").alias("avg_quality")
)
.withColumn("updated_at", current_timestamp())
)
(gold_df.write
.format("delta")
.mode("overwrite")
.saveAsTable(gold_table)
)(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("_ingested_at", current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(bronze_table)
)from delta.tables import DeltaTable
def incremental_silver_merge(bronze_table: str, silver_table: str, watermark: str):
"""Incrementally merge new Bronze data into Silver."""
# Get new records since last watermark
new_records = (spark.read.table(bronze_table)
.filter(col("_ingested_at") > watermark)
)
# Transform
transformed = transform_to_silver(new_records)
# Merge into Silver
silver = DeltaTable.forName(spark, silver_table)
(silver.alias("target")
.merge(
transformed.alias("source"),
"target.id = source.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)def validate_silver_quality(table_name: str) -> Dict[str, bool]:
"""Run quality checks on Silver table."""
df = spark.read.table(table_name)
checks = {
"no_null_ids": df.filter(col("id").isNull()).count() == 0,
"valid_emails": df.filter(
~col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$")
).count() == 0,
"no_duplicates": df.count() == df.select("id").distinct().count(),
"within_date_range": df.filter(
(col("created_date") < "2020-01-01") |
(col("created_date") > current_date())
).count() == 0
}
return checks-- Partition by ingestion date
CREATE TABLE bronze.raw_events
USING delta
PARTITIONED BY (ingestion_date)
AS SELECT *, current_date() as ingestion_date FROM source;
-- Enable auto-optimize
ALTER TABLE bronze.raw_events
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);-- Z-ORDER for common filters
OPTIMIZE silver.customers
ZORDER BY (customer_segment, region, created_date);
-- Enable Change Data Feed
ALTER TABLE silver.customers
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);-- Liquid clustering for query performance
CREATE TABLE gold.customer_metrics
USING delta
CLUSTER BY (customer_segment, date)
AS SELECT * FROM aggregated_metrics;
-- Optimize and vacuum
OPTIMIZE gold.customer_metrics;
VACUUM gold.customer_metrics RETAIN 168 HOURS;/templates/bronze-silver-gold/delta-live-tablesdata-qualitytesting-patternscicd-workflows