parquet-coder

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Parquet-Coder

Parquet-Coder

Patterns for efficient columnar data storage with Parquet.
Parquet 高效列式数据存储的实践模式

Basic Operations

基础操作

python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
python
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

Write with compression

带压缩写入

df.to_parquet('data.parquet', compression='snappy', index=False)
df.to_parquet('data.parquet', compression='snappy', index=False)

Common compression options:

常见压缩选项:

- snappy: Fast, good compression (default)

- snappy: 速度快,压缩效果好(默认)

- gzip: Slower, better compression

- gzip: 速度较慢,压缩效果更好

- zstd: Best balance of speed/compression

- zstd: 速度与压缩效果的最佳平衡

- None: No compression (fastest writes)

- None: 无压缩(写入速度最快)

Read entire file

读取整个文件

df = pd.read_parquet('data.parquet')
df = pd.read_parquet('data.parquet')

Read specific columns only (predicate pushdown)

仅读取指定列(谓词下推)

df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
undefined
df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
undefined

PyArrow for Large Files

处理大文件的 PyArrow 用法

python
undefined
python
undefined

Read as PyArrow Table (more memory efficient)

以 PyArrow Table 格式读取(内存更高效)

table = pq.read_table('data.parquet')
table = pq.read_table('data.parquet')

Convert to pandas when needed

需要时转换为 pandas DataFrame

df = table.to_pandas()
df = table.to_pandas()

Filter while reading (row group filtering)

读取时过滤数据(行组过滤)

table = pq.read_table( 'data.parquet', filters=[ ('date', '>=', '2024-01-01'), ('status', '=', 'active') ] )
table = pq.read_table( 'data.parquet', filters=[ ('date', '>=', '2024-01-01'), ('status', '=', 'active') ] )

Read in batches for huge files

分批读取超大文件

parquet_file = pq.ParquetFile('huge.parquet') for batch in parquet_file.iter_batches(batch_size=100_000): df_batch = batch.to_pandas() process(df_batch)
undefined
parquet_file = pq.ParquetFile('huge.parquet') for batch in parquet_file.iter_batches(batch_size=100_000): df_batch = batch.to_pandas() process(df_batch)
undefined

Partitioned Datasets

分区数据集

python
undefined
python
undefined

Write partitioned by columns

按列分区写入

df.to_parquet( 'data/', partition_cols=['year', 'month'], compression='snappy' )
df.to_parquet( 'data/', partition_cols=['year', 'month'], compression='snappy' )

Creates: data/year=2024/month=01/part-0.parquet

生成路径:data/year=2024/month=01/part-0.parquet

Read partitioned dataset

读取分区数据集

df = pd.read_parquet('data/') # Reads all partitions
df = pd.read_parquet('data/') # 读取所有分区

Read specific partitions only

仅读取指定分区

df = pd.read_parquet('data/year=2024/')
df = pd.read_parquet('data/year=2024/')

With PyArrow dataset API (more control)

使用 PyArrow Dataset API(控制更精细)

import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')

Filter on partition columns (very fast)

对分区列进行过滤(速度极快)

table = dataset.to_table( filter=(ds.field('year') == 2024) & (ds.field('month') >= 6) )
undefined
table = dataset.to_table( filter=(ds.field('year') == 2024) & (ds.field('month') >= 6) )
undefined

Schema Definition

Schema 定义

python
undefined
python
undefined

Explicit schema for consistency

显式定义 Schema 以保证一致性

schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), ('value', pa.float64()), ('date', pa.date32()), ('tags', pa.list_(pa.string())), ('metadata', pa.map_(pa.string(), pa.string())), ])
schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), ('value', pa.float64()), ('date', pa.date32()), ('tags', pa.list_(pa.string())), ('metadata', pa.map_(pa.string(), pa.string())), ])

Write with schema

按指定 Schema 写入

table = pa.Table.from_pandas(df, schema=schema) pq.write_table(table, 'data.parquet')
table = pa.Table.from_pandas(df, schema=schema) pq.write_table(table, 'data.parquet')

Read and validate schema

读取并验证 Schema

file_schema = pq.read_schema('data.parquet') assert file_schema.equals(schema), "Schema mismatch!"
undefined
file_schema = pq.read_schema('data.parquet') assert file_schema.equals(schema), "Schema 不匹配!"
undefined

Schema Evolution

Schema 演进

python
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
    """Create unified schema from old and new."""
    fields = {f.name: f for f in old_schema}
    for field in new_schema:
        if field.name not in fields:
            fields[field.name] = field
        elif fields[field.name].type != field.type:
            # Handle type conflicts (e.g., promote int to float)
            fields[field.name] = pa.field(
                field.name,
                promote_type(fields[field.name].type, field.type)
            )
    return pa.schema(list(fields.values()))

def append_with_schema_evolution(
    existing_path: str,
    new_df: pd.DataFrame,
    output_path: str
) -> None:
    """Append data with automatic schema evolution."""
    existing = pq.read_table(existing_path)
    new_table = pa.Table.from_pandas(new_df)

    # Unify schemas
    unified_schema = pa.unify_schemas([existing.schema, new_table.schema])

    # Cast both to unified schema
    existing = existing.cast(unified_schema)
    new_table = new_table.cast(unified_schema)

    # Concatenate and write
    combined = pa.concat_tables([existing, new_table])
    pq.write_table(combined, output_path)
python
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
    """从旧 Schema 和新 Schema 创建统一的 Schema。"""
    fields = {f.name: f for f in old_schema}
    for field in new_schema:
        if field.name not in fields:
            fields[field.name] = field
        elif fields[field.name].type != field.type:
            # 处理类型冲突(例如,将 int 升级为 float)
            fields[field.name] = pa.field(
                field.name,
                promote_type(fields[field.name].type, field.type)
            )
    return pa.schema(list(fields.values()))

def append_with_schema_evolution(
    existing_path: str,
    new_df: pd.DataFrame,
    output_path: str
) -> None:
    """自动适配 Schema 演进并追加数据。"""
    existing = pq.read_table(existing_path)
    new_table = pa.Table.from_pandas(new_df)

    # 统一 Schema
    unified_schema = pa.unify_schemas([existing.schema, new_table.schema])

    # 将两者转换为统一 Schema
    existing = existing.cast(unified_schema)
    new_table = new_table.cast(unified_schema)

    # 合并并写入
    combined = pa.concat_tables([existing, new_table])
    pq.write_table(combined, output_path)

Row Group Optimization

行组优化

python
undefined
python
undefined

Control row group size (affects read performance)

控制行组大小(影响读取性能)

pq.write_table( table, 'data.parquet', row_group_size=100_000, # Rows per group compression='snappy' )
pq.write_table( table, 'data.parquet', row_group_size=100_000, # 每个组的行数 compression='snappy' )

Read metadata to see row groups

读取元数据查看行组信息

parquet_file = pq.ParquetFile('data.parquet') print(f"Num row groups: {parquet_file.metadata.num_row_groups}") print(f"Num rows: {parquet_file.metadata.num_rows}")
parquet_file = pq.ParquetFile('data.parquet') print(f"行组数量: {parquet_file.metadata.num_row_groups}") print(f"总行数: {parquet_file.metadata.num_rows}")

Read specific row groups

读取指定行组

table = parquet_file.read_row_groups([0, 1]) # First two groups
undefined
table = parquet_file.read_row_groups([0, 1]) # 前两个行组
undefined

Metadata and Statistics

元数据与统计信息

python
undefined
python
undefined

Add custom metadata

添加自定义元数据

custom_metadata = { b'created_by': b'etl_pipeline', b'version': b'1.0', b'source': b'api_export' } schema = table.schema.with_metadata(custom_metadata) table = table.cast(schema) pq.write_table(table, 'data.parquet')
custom_metadata = { b'created_by': b'etl_pipeline', b'version': b'1.0', b'source': b'api_export' } schema = table.schema.with_metadata(custom_metadata) table = table.cast(schema) pq.write_table(table, 'data.parquet')

Read metadata

读取元数据

parquet_file = pq.ParquetFile('data.parquet') print(parquet_file.schema.metadata)
parquet_file = pq.ParquetFile('data.parquet') print(parquet_file.schema.metadata)

Column statistics (min/max for filtering)

列统计信息(用于过滤的最小值/最大值)

metadata = parquet_file.metadata for i in range(metadata.num_row_groups): rg = metadata.row_group(i) for j in range(rg.num_columns): col = rg.column(j) if col.statistics: print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
undefined
metadata = parquet_file.metadata for i in range(metadata.num_row_groups): rg = metadata.row_group(i) for j in range(rg.num_columns): col = rg.column(j) if col.statistics: print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
undefined

Delta Lake Integration

Delta Lake 集成

python
from deltalake import DeltaTable, write_deltalake
python
from deltalake import DeltaTable, write_deltalake

Write as Delta table (versioned parquet)

写入为 Delta 表(带版本控制的 Parquet)

write_deltalake('delta_table/', df, mode='overwrite')
write_deltalake('delta_table/', df, mode='overwrite')

Append data

追加数据

write_deltalake('delta_table/', new_df, mode='append')
write_deltalake('delta_table/', new_df, mode='append')

Read Delta table

读取 Delta 表

dt = DeltaTable('delta_table/') df = dt.to_pandas()
dt = DeltaTable('delta_table/') df = dt.to_pandas()

Time travel

时间旅行

df_old = dt.load_version(0).to_pandas()
df_old = dt.load_version(0).to_pandas()

Compact small files

合并小文件

dt.optimize.compact()
dt.optimize.compact()

Vacuum old versions

清理旧版本

dt.vacuum(retention_hours=168) # Keep 7 days
undefined
dt.vacuum(retention_hours=168) # 保留7天
undefined

Performance Tips

性能优化技巧

python
undefined
python
undefined

1. Use appropriate column types

1. 使用合适的列类型

schema = pa.schema([ ('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings ('count', pa.int32()), # Not int64 if values fit ])
schema = pa.schema([ ('category', pa.dictionary(pa.int8(), pa.string())), # 用于重复字符串 ('count', pa.int32()), # 如果数值范围符合,无需使用 int64 ])

2. Sort data before writing (improves predicate pushdown)

2. 写入前排序数据(提升谓词下推性能)

df = df.sort_values(['date', 'category']) df.to_parquet('sorted.parquet')
df = df.sort_values(['date', 'category']) df.to_parquet('sorted.parquet')

3. Use column selection

3. 仅选择需要的列

df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])

4. Use filters for row pruning

4. 使用过滤条件进行行裁剪

df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])

5. Parallel reads

5. 并行读取

import pyarrow.dataset as ds dataset = ds.dataset('data/', format='parquet') table = dataset.to_table(use_threads=True)
undefined
import pyarrow.dataset as ds dataset = ds.dataset('data/', format='parquet') table = dataset.to_table(use_threads=True)
undefined

Conversion Utilities

转换工具

python
def csv_to_parquet(
    csv_path: str,
    parquet_path: str,
    chunk_size: int = 100_000
) -> None:
    """Convert large CSV to Parquet efficiently."""
    writer = None

    for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
        table = pa.Table.from_pandas(chunk)

        if writer is None:
            writer = pq.ParquetWriter(parquet_path, table.schema)

        writer.write_table(table)

    writer.close()

def json_to_parquet(json_path: str, parquet_path: str) -> None:
    """Convert JSON lines to Parquet."""
    df = pd.read_json(json_path, lines=True)
    df.to_parquet(parquet_path, compression='snappy')
python
def csv_to_parquet(
    csv_path: str,
    parquet_path: str,
    chunk_size: int = 100_000
) -> None:
    """高效将大型 CSV 转换为 Parquet。"""
    writer = None

    for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
        table = pa.Table.from_pandas(chunk)

        if writer is None:
            writer = pq.ParquetWriter(parquet_path, table.schema)

        writer.write_table(table)

    writer.close()

def json_to_parquet(json_path: str, parquet_path: str) -> None:
    """将 JSON 行转换为 Parquet。"""
    df = pd.read_json(json_path, lines=True)
    df.to_parquet(parquet_path, compression='snappy')