Loading...
Loading...
Using DuckDB with remote cloud storage via HTTPFS extension, fsspec, and Delta Lake integration. Covers S3, GCS, Azure, and S3-compatible endpoints.
npx skill4agent add legout/data-platform-agent-skills data-engineering-storage-remote-access-integrations-duckdbimport duckdb
from contextlib import contextmanager
@contextmanager
def get_duckdb_connection():
"""Context manager ensures connection cleanup."""
con = duckdb.connect()
try:
con.execute("INSTALL httpfs; LOAD httpfs;")
yield con
finally:
con.close()
# Configure and query
with get_duckdb_connection() as con:
# S3 configuration
con.execute("""
SET s3_region='us-east-1';
SET s3_access_key_id='AKIA...';
SET s3_secret_access_key='...';
-- For temporary creds: SET s3_session_token='...'
-- For S3-compatible: SET s3_endpoint='http://minio:9000';
""")
# Query Parquet directly
df = con.sql("""
SELECT category, SUM(value) as total
FROM read_parquet('s3://bucket/data/*.parquet')
WHERE date >= '2024-01-01'
GROUP BY category
""").pl()
# Read from GCS (configure via environment or default credentials)
df = con.sql("SELECT * FROM read_csv('gs://bucket/data.csv')").pl()import os
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIA...'
os.environ['AWS_SECRET_ACCESS_KEY'] = '...'
os.environ['AWS_REGION'] = 'us-east-1'
# DuckDB HTTPFS reads these automatically on first use
import duckdb
con = duckdb.connect()
con.execute("INSTALL httpfs; LOAD httpfs;")
df = con.sql("SELECT * FROM read_parquet('s3://bucket/data.parquet')").pl()import fsspec
import duckdb
# Register GCS (or any fsspec protocol)
duckdb.register_filesystem(fsspec.filesystem('gcs'))
# Now use gcs:// URIs natively
df = duckdb.sql("""
SELECT * FROM read_parquet('gcs://bucket/data.parquet')
""").pl()import duckdb
with duckdb.connect() as con:
# Export table to S3
con.sql("""
COPY (SELECT * FROM my_table)
TO 's3://bucket/output.parquet'
(FORMAT PARQUET)
""")
# Import from S3
con.sql("""
CREATE TABLE imported AS
SELECT * FROM read_parquet('s3://bucket/input.parquet')
""")import duckdb
with duckdb.connect() as con:
con.execute("INSTALL delta; LOAD delta;")
# Query Delta table
df = con.sql("""
SELECT * FROM delta_scan('s3://bucket/delta-table/')
WHERE date >= '2024-01-01'
""").pl()
# Time travel (read specific version)
df = con.sql("""
SELECT * FROM delta_scan('s3://bucket/delta-table/', version => 5)
""").pl()# ✅ DO: Use context manager
with duckdb.connect("analytics.db") as con:
con.sql("CREATE TABLE ...")
# ❌ DON'T: Leak connections
con = duckdb.connect("analytics.db")
con.sql("...") # Never closed → leak
# ✅ DO: If you must, manually close
con = duckdb.connect("analytics.db")
try:
con.sql("...")
finally:
con.close()@data-engineering-storage-authenticationAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_REGIONEXPLAINcon.sql("EXPLAIN SELECT ...").pl()@data-engineering-core