Loading...
Loading...
Delta Lake integration with cloud storage (S3, GCS, Azure). Covers storage_options, PyArrow filesystem, time travel, and partitioned writes.
npx skill4agent add legout/data-platform-agent-skills data-engineering-storage-remote-access-integrations-delta-lakedeltalakepip install deltalake pyarrowfrom deltalake import DeltaTable, write_deltalake
import pyarrow as pa
# S3 configuration
storage_options = {
"AWS_ACCESS_KEY_ID": "AKIA...",
"AWS_SECRET_ACCESS_KEY": "...",
"AWS_REGION": "us-east-1"
}
# Alternatively, use environment variables (preferred for production)
# os.environ['AWS_ACCESS_KEY_ID'], etc.
# Write Delta table
write_deltalake(
"s3://bucket/delta-table",
data=pa_table,
storage_options=storage_options,
mode="overwrite",
partition_by=["date"]
)
# Read Delta table
dt = DeltaTable(
"s3://bucket/delta-table",
storage_options=storage_options
)
df = dt.to_pandas()storage_options = {
"GOOGLE_SERVICE_ACCOUNT_KEY_JSON": "/path/to/key.json"
# Or use env var GOOGLE_APPLICATION_CREDENTIALS
}storage_options = {
"AZURE_STORAGE_CONNECTION_STRING": "...",
# OR: "AZURE_STORAGE_ACCOUNT_NAME" + "AZURE_STORAGE_ACCOUNT_KEY"
}import pyarrow.fs as fs
from deltalake import write_deltalake, DeltaTable
# Create filesystem
raw_fs, subpath = fs.FileSystem.from_uri("s3://bucket/delta-table")
filesystem = fs.SubTreeFileSystem(subpath, raw_fs)
# Write
write_deltalake(
"delta-table", # relative to filesystem root
data=pa_table,
filesystem=filesystem,
mode="append"
)
# Read
dt = DeltaTable("delta-table", filesystem=filesystem)from deltalake import DeltaTable
dt = DeltaTable("s3://bucket/delta-table")
# Load specific version
dt.load_version(5)
df_v5 = dt.to_pandas()
# Load by timestamp
dt.load_with_datetime("2024-01-01T12:00:00Z")
df_ts = dt.to_pandas()
# Get history
history = dt.history().to_pandas()
print(history[["version", "timestamp", "operation"]])# Vacuum old files (retention in hours)
dt.vacuum(retention_hours=24) # Clean files older than 24h
# Optimize compaction (combine small files)
dt.optimize().execute()
# Get file list
files = dt.files()
print(files) # List of Parquet files in the table
# Get metadata
details = dt.details()
print(details)from deltalake import DeltaTable
from datetime import datetime
dt = DeltaTable("s3://bucket/delta-table")
# Get changes since last checkpoint
last_version = get_checkpoint() # Your checkpoint tracking
# Read only added/modified files
changes = (
dt.history()
.filter(f"version > {last_version}")
.to_pyarrow_table()
)
# Or read full snapshot and compare
df = dt.to_pandas()
# ... compare with previous snapshot ...
# Update checkpoint
save_checkpoint(dt.version())dt.version()dt.history()@data-engineering-storage-authenticationAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_REGIONAWS_ENDPOINT_URLstorage_options@data-engineering-storage-lakehouse/delta-lake@data-engineering-core@data-engineering-storage-lakehouse