data-engineering-storage-remote-access-integrations-delta-lake
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDelta Lake on Cloud Storage
云存储上的Delta Lake
Integrating Delta Lake tables with cloud storage (S3, GCS, Azure) using the pure-Python package.
deltalake使用纯Python的包将Delta Lake表与云存储(S3、GCS、Azure)集成。
deltalakeInstallation
安装
bash
pip install deltalake pyarrowbash
pip install deltalake pyarrowConfiguration Patterns
配置模式
Method 1: storage_options (Recommended)
方法1:storage_options(推荐)
The simplest approach using dictionary-based configuration:
python
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa基于字典配置的最简方法:
python
from deltalake import DeltaTable, write_deltalake
import pyarrow as paS3 configuration
S3配置
storage_options = {
"AWS_ACCESS_KEY_ID": "AKIA...",
"AWS_SECRET_ACCESS_KEY": "...",
"AWS_REGION": "us-east-1"
}
storage_options = {
"AWS_ACCESS_KEY_ID": "AKIA...",
"AWS_SECRET_ACCESS_KEY": "...",
"AWS_REGION": "us-east-1"
}
Alternatively, use environment variables (preferred for production)
或者,使用环境变量(生产环境首选)
os.environ['AWS_ACCESS_KEY_ID'], etc.
os.environ['AWS_ACCESS_KEY_ID'], etc.
Write Delta table
写入Delta表
write_deltalake(
"s3://bucket/delta-table",
data=pa_table,
storage_options=storage_options,
mode="overwrite",
partition_by=["date"]
)
write_deltalake(
"s3://bucket/delta-table",
data=pa_table,
storage_options=storage_options,
mode="overwrite",
partition_by=["date"]
)
Read Delta table
读取Delta表
dt = DeltaTable(
"s3://bucket/delta-table",
storage_options=storage_options
)
df = dt.to_pandas()
**GCS configuration:**
```python
storage_options = {
"GOOGLE_SERVICE_ACCOUNT_KEY_JSON": "/path/to/key.json"
# Or use env var GOOGLE_APPLICATION_CREDENTIALS
}Azure configuration:
python
storage_options = {
"AZURE_STORAGE_CONNECTION_STRING": "...",
# OR: "AZURE_STORAGE_ACCOUNT_NAME" + "AZURE_STORAGE_ACCOUNT_KEY"
}dt = DeltaTable(
"s3://bucket/delta-table",
storage_options=storage_options
)
df = dt.to_pandas()
**GCS配置:**
```python
storage_options = {
"GOOGLE_SERVICE_ACCOUNT_KEY_JSON": "/path/to/key.json"
# 或使用环境变量GOOGLE_APPLICATION_CREDENTIALS
}Azure配置:
python
storage_options = {
"AZURE_STORAGE_CONNECTION_STRING": "...",
# 或者:"AZURE_STORAGE_ACCOUNT_NAME" + "AZURE_STORAGE_ACCOUNT_KEY"
}Method 2: PyArrow Filesystem (Advanced)
方法2:PyArrow文件系统(进阶)
Use PyArrow filesystem objects for more control:
python
import pyarrow.fs as fs
from deltalake import write_deltalake, DeltaTable使用PyArrow文件系统对象以获得更多控制:
python
import pyarrow.fs as fs
from deltalake import write_deltalake, DeltaTableCreate filesystem
创建文件系统
raw_fs, subpath = fs.FileSystem.from_uri("s3://bucket/delta-table")
filesystem = fs.SubTreeFileSystem(subpath, raw_fs)
raw_fs, subpath = fs.FileSystem.from_uri("s3://bucket/delta-table")
filesystem = fs.SubTreeFileSystem(subpath, raw_fs)
Write
写入
write_deltalake(
"delta-table", # relative to filesystem root
data=pa_table,
filesystem=filesystem,
mode="append"
)
write_deltalake(
"delta-table", # 相对于文件系统根目录
data=pa_table,
filesystem=filesystem,
mode="append"
)
Read
读取
dt = DeltaTable("delta-table", filesystem=filesystem)
undefineddt = DeltaTable("delta-table", filesystem=filesystem)
undefinedTime Travel
时间旅行
python
from deltalake import DeltaTable
dt = DeltaTable("s3://bucket/delta-table")python
from deltalake import DeltaTable
dt = DeltaTable("s3://bucket/delta-table")Load specific version
加载特定版本
dt.load_version(5)
df_v5 = dt.to_pandas()
dt.load_version(5)
df_v5 = dt.to_pandas()
Load by timestamp
按时间戳加载
dt.load_with_datetime("2024-01-01T12:00:00Z")
df_ts = dt.to_pandas()
dt.load_with_datetime("2024-01-01T12:00:00Z")
df_ts = dt.to_pandas()
Get history
获取历史记录
history = dt.history().to_pandas()
print(history[["version", "timestamp", "operation"]])
undefinedhistory = dt.history().to_pandas()
print(history[["version", "timestamp", "operation"]])
undefinedMaintenance Operations
维护操作
python
undefinedpython
undefinedVacuum old files (retention in hours)
清理旧文件(保留时长以小时为单位)
dt.vacuum(retention_hours=24) # Clean files older than 24h
dt.vacuum(retention_hours=24) # 清理24小时前的文件
Optimize compaction (combine small files)
优化压缩(合并小文件)
dt.optimize().execute()
dt.optimize().execute()
Get file list
获取文件列表
files = dt.files()
print(files) # List of Parquet files in the table
files = dt.files()
print(files) # 表中的Parquet文件列表
Get metadata
获取元数据
details = dt.details()
print(details)
undefineddetails = dt.details()
print(details)
undefinedIncremental Processing
增量处理
For change data capture (CDC) patterns:
python
from deltalake import DeltaTable
from datetime import datetime
dt = DeltaTable("s3://bucket/delta-table")适用于变更数据捕获(CDC)场景:
python
from deltalake import DeltaTable
from datetime import datetime
dt = DeltaTable("s3://bucket/delta-table")Get changes since last checkpoint
获取上次检查点后的变更
last_version = get_checkpoint() # Your checkpoint tracking
last_version = get_checkpoint() # 你的检查点跟踪逻辑
Read only added/modified files
仅读取新增/修改的文件
changes = (
dt.history()
.filter(f"version > {last_version}")
.to_pyarrow_table()
)
changes = (
dt.history()
.filter(f"version > {last_version}")
.to_pyarrow_table()
)
Or read full snapshot and compare
或读取完整快照并比较
df = dt.to_pandas()
df = dt.to_pandas()
... compare with previous snapshot ...
... 与之前的快照进行比较 ...
Update checkpoint
更新检查点
save_checkpoint(dt.version())
undefinedsave_checkpoint(dt.version())
undefinedBest Practices
最佳实践
- ✅ Use environment variables for credentials in production (never hardcode)
- ✅ Partition tables by date/region for efficient querying
- ✅ Vacuum regularly to clean up old files (but retain enough for your time travel needs)
- ✅ Optimize periodically to compact small files
- ✅ Track versions for incremental processing using and
dt.version()dt.history() - ⚠️ Don't disable vacuum entirely - storage bloat
- ⚠️ Don't vacuum too aggressively - you'll lose time travel capability
- ✅ 生产环境使用环境变量存储凭证(绝对不要硬编码)
- ✅ 按日期/地区分区表以提高查询效率
- ✅ 定期执行Vacuum清理旧文件(但要保留足够的文件以满足时间旅行需求)
- ✅ 定期执行Optimize以合并小文件
- ✅ 使用和
dt.version()跟踪版本用于增量处理dt.history() - ⚠️ 不要完全禁用Vacuum - 会导致存储膨胀
- ⚠️ 不要过于频繁执行Vacuum - 会丢失时间旅行能力
Authentication
身份验证
See for detailed cloud auth patterns.
@data-engineering-storage-authenticationFor S3:
- Environment: ,
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEYAWS_REGION - IAM roles (EC2, ECS, Lambda) override env vars
- For S3-compatible (MinIO): or in
AWS_ENDPOINT_URLstorage_options
有关详细的云身份验证模式,请参阅。
@data-engineering-storage-authentication对于S3:
- 环境变量:、
AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEYAWS_REGION - IAM角色(EC2、ECS、Lambda)会覆盖环境变量
- 对于S3兼容存储(MinIO):使用或在
AWS_ENDPOINT_URL中配置storage_options
Related
相关内容
- - Delta Lake concepts and API
@data-engineering-storage-lakehouse/delta-lake - - Using Delta with DuckDB
@data-engineering-core - - Comparisons with Iceberg, Hudi
@data-engineering-storage-lakehouse
- - Delta Lake概念与API
@data-engineering-storage-lakehouse/delta-lake - - 将Delta与DuckDB结合使用
@data-engineering-core - - 与Iceberg、Hudi的对比
@data-engineering-storage-lakehouse