data-engineering-storage-remote-access-integrations-delta-lake

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Delta Lake on Cloud Storage

云存储上的Delta Lake

Integrating Delta Lake tables with cloud storage (S3, GCS, Azure) using the pure-Python
deltalake
package.
使用纯Python的
deltalake
包将Delta Lake表与云存储(S3、GCS、Azure)集成。

Installation

安装

bash
pip install deltalake pyarrow
bash
pip install deltalake pyarrow

Configuration Patterns

配置模式

Method 1: storage_options (Recommended)

方法1:storage_options(推荐)

The simplest approach using dictionary-based configuration:
python
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
基于字典配置的最简方法:
python
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

S3 configuration

S3配置

storage_options = { "AWS_ACCESS_KEY_ID": "AKIA...", "AWS_SECRET_ACCESS_KEY": "...", "AWS_REGION": "us-east-1" }
storage_options = { "AWS_ACCESS_KEY_ID": "AKIA...", "AWS_SECRET_ACCESS_KEY": "...", "AWS_REGION": "us-east-1" }

Alternatively, use environment variables (preferred for production)

或者,使用环境变量(生产环境首选)

os.environ['AWS_ACCESS_KEY_ID'], etc.

os.environ['AWS_ACCESS_KEY_ID'], etc.

Write Delta table

写入Delta表

write_deltalake( "s3://bucket/delta-table", data=pa_table, storage_options=storage_options, mode="overwrite", partition_by=["date"] )
write_deltalake( "s3://bucket/delta-table", data=pa_table, storage_options=storage_options, mode="overwrite", partition_by=["date"] )

Read Delta table

读取Delta表

dt = DeltaTable( "s3://bucket/delta-table", storage_options=storage_options ) df = dt.to_pandas()

**GCS configuration:**
```python
storage_options = {
    "GOOGLE_SERVICE_ACCOUNT_KEY_JSON": "/path/to/key.json"
    # Or use env var GOOGLE_APPLICATION_CREDENTIALS
}
Azure configuration:
python
storage_options = {
    "AZURE_STORAGE_CONNECTION_STRING": "...",
    # OR: "AZURE_STORAGE_ACCOUNT_NAME" + "AZURE_STORAGE_ACCOUNT_KEY"
}
dt = DeltaTable( "s3://bucket/delta-table", storage_options=storage_options ) df = dt.to_pandas()

**GCS配置:**
```python
storage_options = {
    "GOOGLE_SERVICE_ACCOUNT_KEY_JSON": "/path/to/key.json"
    # 或使用环境变量GOOGLE_APPLICATION_CREDENTIALS
}
Azure配置:
python
storage_options = {
    "AZURE_STORAGE_CONNECTION_STRING": "...",
    # 或者:"AZURE_STORAGE_ACCOUNT_NAME" + "AZURE_STORAGE_ACCOUNT_KEY"
}

Method 2: PyArrow Filesystem (Advanced)

方法2:PyArrow文件系统(进阶)

Use PyArrow filesystem objects for more control:
python
import pyarrow.fs as fs
from deltalake import write_deltalake, DeltaTable
使用PyArrow文件系统对象以获得更多控制:
python
import pyarrow.fs as fs
from deltalake import write_deltalake, DeltaTable

Create filesystem

创建文件系统

raw_fs, subpath = fs.FileSystem.from_uri("s3://bucket/delta-table") filesystem = fs.SubTreeFileSystem(subpath, raw_fs)
raw_fs, subpath = fs.FileSystem.from_uri("s3://bucket/delta-table") filesystem = fs.SubTreeFileSystem(subpath, raw_fs)

Write

写入

write_deltalake( "delta-table", # relative to filesystem root data=pa_table, filesystem=filesystem, mode="append" )
write_deltalake( "delta-table", # 相对于文件系统根目录 data=pa_table, filesystem=filesystem, mode="append" )

Read

读取

dt = DeltaTable("delta-table", filesystem=filesystem)
undefined
dt = DeltaTable("delta-table", filesystem=filesystem)
undefined

Time Travel

时间旅行

python
from deltalake import DeltaTable

dt = DeltaTable("s3://bucket/delta-table")
python
from deltalake import DeltaTable

dt = DeltaTable("s3://bucket/delta-table")

Load specific version

加载特定版本

dt.load_version(5) df_v5 = dt.to_pandas()
dt.load_version(5) df_v5 = dt.to_pandas()

Load by timestamp

按时间戳加载

dt.load_with_datetime("2024-01-01T12:00:00Z") df_ts = dt.to_pandas()
dt.load_with_datetime("2024-01-01T12:00:00Z") df_ts = dt.to_pandas()

Get history

获取历史记录

history = dt.history().to_pandas() print(history[["version", "timestamp", "operation"]])
undefined
history = dt.history().to_pandas() print(history[["version", "timestamp", "operation"]])
undefined

Maintenance Operations

维护操作

python
undefined
python
undefined

Vacuum old files (retention in hours)

清理旧文件(保留时长以小时为单位)

dt.vacuum(retention_hours=24) # Clean files older than 24h
dt.vacuum(retention_hours=24) # 清理24小时前的文件

Optimize compaction (combine small files)

优化压缩(合并小文件)

dt.optimize().execute()
dt.optimize().execute()

Get file list

获取文件列表

files = dt.files() print(files) # List of Parquet files in the table
files = dt.files() print(files) # 表中的Parquet文件列表

Get metadata

获取元数据

details = dt.details() print(details)
undefined
details = dt.details() print(details)
undefined

Incremental Processing

增量处理

For change data capture (CDC) patterns:
python
from deltalake import DeltaTable
from datetime import datetime

dt = DeltaTable("s3://bucket/delta-table")
适用于变更数据捕获(CDC)场景:
python
from deltalake import DeltaTable
from datetime import datetime

dt = DeltaTable("s3://bucket/delta-table")

Get changes since last checkpoint

获取上次检查点后的变更

last_version = get_checkpoint() # Your checkpoint tracking
last_version = get_checkpoint() # 你的检查点跟踪逻辑

Read only added/modified files

仅读取新增/修改的文件

changes = ( dt.history() .filter(f"version > {last_version}") .to_pyarrow_table() )
changes = ( dt.history() .filter(f"version > {last_version}") .to_pyarrow_table() )

Or read full snapshot and compare

或读取完整快照并比较

df = dt.to_pandas()
df = dt.to_pandas()

... compare with previous snapshot ...

... 与之前的快照进行比较 ...

Update checkpoint

更新检查点

save_checkpoint(dt.version())
undefined
save_checkpoint(dt.version())
undefined

Best Practices

最佳实践

  1. Use environment variables for credentials in production (never hardcode)
  2. Partition tables by date/region for efficient querying
  3. Vacuum regularly to clean up old files (but retain enough for your time travel needs)
  4. Optimize periodically to compact small files
  5. Track versions for incremental processing using
    dt.version()
    and
    dt.history()
  6. ⚠️ Don't disable vacuum entirely - storage bloat
  7. ⚠️ Don't vacuum too aggressively - you'll lose time travel capability
  1. 生产环境使用环境变量存储凭证(绝对不要硬编码)
  2. 按日期/地区分区表以提高查询效率
  3. 定期执行Vacuum清理旧文件(但要保留足够的文件以满足时间旅行需求)
  4. 定期执行Optimize以合并小文件
  5. 使用
    dt.version()
    dt.history()
    跟踪版本
    用于增量处理
  6. ⚠️ 不要完全禁用Vacuum - 会导致存储膨胀
  7. ⚠️ 不要过于频繁执行Vacuum - 会丢失时间旅行能力

Authentication

身份验证

See
@data-engineering-storage-authentication
for detailed cloud auth patterns.
For S3:
  • Environment:
    AWS_ACCESS_KEY_ID
    ,
    AWS_SECRET_ACCESS_KEY
    ,
    AWS_REGION
  • IAM roles (EC2, ECS, Lambda) override env vars
  • For S3-compatible (MinIO):
    AWS_ENDPOINT_URL
    or in
    storage_options
有关详细的云身份验证模式,请参阅
@data-engineering-storage-authentication
对于S3:
  • 环境变量:
    AWS_ACCESS_KEY_ID
    AWS_SECRET_ACCESS_KEY
    AWS_REGION
  • IAM角色(EC2、ECS、Lambda)会覆盖环境变量
  • 对于S3兼容存储(MinIO):使用
    AWS_ENDPOINT_URL
    或在
    storage_options
    中配置

Related

相关内容

  • @data-engineering-storage-lakehouse/delta-lake
    - Delta Lake concepts and API
  • @data-engineering-core
    - Using Delta with DuckDB
  • @data-engineering-storage-lakehouse
    - Comparisons with Iceberg, Hudi

  • @data-engineering-storage-lakehouse/delta-lake
    - Delta Lake概念与API
  • @data-engineering-core
    - 将Delta与DuckDB结合使用
  • @data-engineering-storage-lakehouse
    - 与Iceberg、Hudi的对比

References

参考资料