parquet-coder
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseParquet-Coder
Parquet-Coder
Patterns for efficient columnar data storage with Parquet.
Parquet 高效列式数据存储的实践模式
Basic Operations
基础操作
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pqpython
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pqWrite with compression
带压缩写入
df.to_parquet('data.parquet', compression='snappy', index=False)
df.to_parquet('data.parquet', compression='snappy', index=False)
Common compression options:
常见压缩选项:
- snappy: Fast, good compression (default)
- snappy: 速度快,压缩效果好(默认)
- gzip: Slower, better compression
- gzip: 速度较慢,压缩效果更好
- zstd: Best balance of speed/compression
- zstd: 速度与压缩效果的最佳平衡
- None: No compression (fastest writes)
- None: 无压缩(写入速度最快)
Read entire file
读取整个文件
df = pd.read_parquet('data.parquet')
df = pd.read_parquet('data.parquet')
Read specific columns only (predicate pushdown)
仅读取指定列(谓词下推)
df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
undefineddf = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
undefinedPyArrow for Large Files
处理大文件的 PyArrow 用法
python
undefinedpython
undefinedRead as PyArrow Table (more memory efficient)
以 PyArrow Table 格式读取(内存更高效)
table = pq.read_table('data.parquet')
table = pq.read_table('data.parquet')
Convert to pandas when needed
需要时转换为 pandas DataFrame
df = table.to_pandas()
df = table.to_pandas()
Filter while reading (row group filtering)
读取时过滤数据(行组过滤)
table = pq.read_table(
'data.parquet',
filters=[
('date', '>=', '2024-01-01'),
('status', '=', 'active')
]
)
table = pq.read_table(
'data.parquet',
filters=[
('date', '>=', '2024-01-01'),
('status', '=', 'active')
]
)
Read in batches for huge files
分批读取超大文件
parquet_file = pq.ParquetFile('huge.parquet')
for batch in parquet_file.iter_batches(batch_size=100_000):
df_batch = batch.to_pandas()
process(df_batch)
undefinedparquet_file = pq.ParquetFile('huge.parquet')
for batch in parquet_file.iter_batches(batch_size=100_000):
df_batch = batch.to_pandas()
process(df_batch)
undefinedPartitioned Datasets
分区数据集
python
undefinedpython
undefinedWrite partitioned by columns
按列分区写入
df.to_parquet(
'data/',
partition_cols=['year', 'month'],
compression='snappy'
)
df.to_parquet(
'data/',
partition_cols=['year', 'month'],
compression='snappy'
)
Creates: data/year=2024/month=01/part-0.parquet
生成路径:data/year=2024/month=01/part-0.parquet
Read partitioned dataset
读取分区数据集
df = pd.read_parquet('data/') # Reads all partitions
df = pd.read_parquet('data/') # 读取所有分区
Read specific partitions only
仅读取指定分区
df = pd.read_parquet('data/year=2024/')
df = pd.read_parquet('data/year=2024/')
With PyArrow dataset API (more control)
使用 PyArrow Dataset API(控制更精细)
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
Filter on partition columns (very fast)
对分区列进行过滤(速度极快)
table = dataset.to_table(
filter=(ds.field('year') == 2024) & (ds.field('month') >= 6)
)
undefinedtable = dataset.to_table(
filter=(ds.field('year') == 2024) & (ds.field('month') >= 6)
)
undefinedSchema Definition
Schema 定义
python
undefinedpython
undefinedExplicit schema for consistency
显式定义 Schema 以保证一致性
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64()),
('date', pa.date32()),
('tags', pa.list_(pa.string())),
('metadata', pa.map_(pa.string(), pa.string())),
])
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64()),
('date', pa.date32()),
('tags', pa.list_(pa.string())),
('metadata', pa.map_(pa.string(), pa.string())),
])
Write with schema
按指定 Schema 写入
table = pa.Table.from_pandas(df, schema=schema)
pq.write_table(table, 'data.parquet')
table = pa.Table.from_pandas(df, schema=schema)
pq.write_table(table, 'data.parquet')
Read and validate schema
读取并验证 Schema
file_schema = pq.read_schema('data.parquet')
assert file_schema.equals(schema), "Schema mismatch!"
undefinedfile_schema = pq.read_schema('data.parquet')
assert file_schema.equals(schema), "Schema 不匹配!"
undefinedSchema Evolution
Schema 演进
python
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
"""Create unified schema from old and new."""
fields = {f.name: f for f in old_schema}
for field in new_schema:
if field.name not in fields:
fields[field.name] = field
elif fields[field.name].type != field.type:
# Handle type conflicts (e.g., promote int to float)
fields[field.name] = pa.field(
field.name,
promote_type(fields[field.name].type, field.type)
)
return pa.schema(list(fields.values()))
def append_with_schema_evolution(
existing_path: str,
new_df: pd.DataFrame,
output_path: str
) -> None:
"""Append data with automatic schema evolution."""
existing = pq.read_table(existing_path)
new_table = pa.Table.from_pandas(new_df)
# Unify schemas
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])
# Cast both to unified schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)
# Concatenate and write
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)python
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
"""从旧 Schema 和新 Schema 创建统一的 Schema。"""
fields = {f.name: f for f in old_schema}
for field in new_schema:
if field.name not in fields:
fields[field.name] = field
elif fields[field.name].type != field.type:
# 处理类型冲突(例如,将 int 升级为 float)
fields[field.name] = pa.field(
field.name,
promote_type(fields[field.name].type, field.type)
)
return pa.schema(list(fields.values()))
def append_with_schema_evolution(
existing_path: str,
new_df: pd.DataFrame,
output_path: str
) -> None:
"""自动适配 Schema 演进并追加数据。"""
existing = pq.read_table(existing_path)
new_table = pa.Table.from_pandas(new_df)
# 统一 Schema
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])
# 将两者转换为统一 Schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)
# 合并并写入
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)Row Group Optimization
行组优化
python
undefinedpython
undefinedControl row group size (affects read performance)
控制行组大小(影响读取性能)
pq.write_table(
table,
'data.parquet',
row_group_size=100_000, # Rows per group
compression='snappy'
)
pq.write_table(
table,
'data.parquet',
row_group_size=100_000, # 每个组的行数
compression='snappy'
)
Read metadata to see row groups
读取元数据查看行组信息
parquet_file = pq.ParquetFile('data.parquet')
print(f"Num row groups: {parquet_file.metadata.num_row_groups}")
print(f"Num rows: {parquet_file.metadata.num_rows}")
parquet_file = pq.ParquetFile('data.parquet')
print(f"行组数量: {parquet_file.metadata.num_row_groups}")
print(f"总行数: {parquet_file.metadata.num_rows}")
Read specific row groups
读取指定行组
table = parquet_file.read_row_groups([0, 1]) # First two groups
undefinedtable = parquet_file.read_row_groups([0, 1]) # 前两个行组
undefinedMetadata and Statistics
元数据与统计信息
python
undefinedpython
undefinedAdd custom metadata
添加自定义元数据
custom_metadata = {
b'created_by': b'etl_pipeline',
b'version': b'1.0',
b'source': b'api_export'
}
schema = table.schema.with_metadata(custom_metadata)
table = table.cast(schema)
pq.write_table(table, 'data.parquet')
custom_metadata = {
b'created_by': b'etl_pipeline',
b'version': b'1.0',
b'source': b'api_export'
}
schema = table.schema.with_metadata(custom_metadata)
table = table.cast(schema)
pq.write_table(table, 'data.parquet')
Read metadata
读取元数据
parquet_file = pq.ParquetFile('data.parquet')
print(parquet_file.schema.metadata)
parquet_file = pq.ParquetFile('data.parquet')
print(parquet_file.schema.metadata)
Column statistics (min/max for filtering)
列统计信息(用于过滤的最小值/最大值)
metadata = parquet_file.metadata
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
for j in range(rg.num_columns):
col = rg.column(j)
if col.statistics:
print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
undefinedmetadata = parquet_file.metadata
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
for j in range(rg.num_columns):
col = rg.column(j)
if col.statistics:
print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
undefinedDelta Lake Integration
Delta Lake 集成
python
from deltalake import DeltaTable, write_deltalakepython
from deltalake import DeltaTable, write_deltalakeWrite as Delta table (versioned parquet)
写入为 Delta 表(带版本控制的 Parquet)
write_deltalake('delta_table/', df, mode='overwrite')
write_deltalake('delta_table/', df, mode='overwrite')
Append data
追加数据
write_deltalake('delta_table/', new_df, mode='append')
write_deltalake('delta_table/', new_df, mode='append')
Read Delta table
读取 Delta 表
dt = DeltaTable('delta_table/')
df = dt.to_pandas()
dt = DeltaTable('delta_table/')
df = dt.to_pandas()
Time travel
时间旅行
df_old = dt.load_version(0).to_pandas()
df_old = dt.load_version(0).to_pandas()
Compact small files
合并小文件
dt.optimize.compact()
dt.optimize.compact()
Vacuum old versions
清理旧版本
dt.vacuum(retention_hours=168) # Keep 7 days
undefineddt.vacuum(retention_hours=168) # 保留7天
undefinedPerformance Tips
性能优化技巧
python
undefinedpython
undefined1. Use appropriate column types
1. 使用合适的列类型
schema = pa.schema([
('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings
('count', pa.int32()), # Not int64 if values fit
])
schema = pa.schema([
('category', pa.dictionary(pa.int8(), pa.string())), # 用于重复字符串
('count', pa.int32()), # 如果数值范围符合,无需使用 int64
])
2. Sort data before writing (improves predicate pushdown)
2. 写入前排序数据(提升谓词下推性能)
df = df.sort_values(['date', 'category'])
df.to_parquet('sorted.parquet')
df = df.sort_values(['date', 'category'])
df.to_parquet('sorted.parquet')
3. Use column selection
3. 仅选择需要的列
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
4. Use filters for row pruning
4. 使用过滤条件进行行裁剪
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
5. Parallel reads
5. 并行读取
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
table = dataset.to_table(use_threads=True)
undefinedimport pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
table = dataset.to_table(use_threads=True)
undefinedConversion Utilities
转换工具
python
def csv_to_parquet(
csv_path: str,
parquet_path: str,
chunk_size: int = 100_000
) -> None:
"""Convert large CSV to Parquet efficiently."""
writer = None
for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(parquet_path, table.schema)
writer.write_table(table)
writer.close()
def json_to_parquet(json_path: str, parquet_path: str) -> None:
"""Convert JSON lines to Parquet."""
df = pd.read_json(json_path, lines=True)
df.to_parquet(parquet_path, compression='snappy')python
def csv_to_parquet(
csv_path: str,
parquet_path: str,
chunk_size: int = 100_000
) -> None:
"""高效将大型 CSV 转换为 Parquet。"""
writer = None
for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(parquet_path, table.schema)
writer.write_table(table)
writer.close()
def json_to_parquet(json_path: str, parquet_path: str) -> None:
"""将 JSON 行转换为 Parquet。"""
df = pd.read_json(json_path, lines=True)
df.to_parquet(parquet_path, compression='snappy')