data-engineering-storage-remote-access-libraries-pyarrow-fs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

PyArrow.fs: Native Arrow Filesystems

PyArrow.fs:原生Arrow文件系统

PyArrow provides its own filesystem abstraction optimized for Arrow/Parquet workflows with zero-copy integration.
PyArrow提供了专属的文件系统抽象层,针对Arrow/Parquet工作流优化,支持零拷贝集成。

Installation

安装

bash
undefined
bash
undefined

Bundled with PyArrow - no extra deps

随PyArrow捆绑提供 - 无需额外依赖

pip install pyarrow
undefined
pip install pyarrow
undefined

Basic Usage

基础用法

python
import pyarrow.fs as fs
from pyarrow import parquet as pq
python
import pyarrow.fs as fs
from pyarrow import parquet as pq

From URI - auto-detects filesystem type

通过URI自动检测文件系统类型

s3_fs, path = fs.FileSystem.from_uri("s3://bucket/path/to/data/") print(type(s3_fs)) # <class 'pyarrow._fs.S3FileSystem'> print(path) # 'path/to/data/'
s3_fs, path = fs.FileSystem.from_uri("s3://bucket/path/to/data/") print(type(s3_fs)) # <class 'pyarrow._fs.S3FileSystem'> print(path) # 'path/to/data/'

GCS via URI

通过URI使用GCS

gcs_fs, path = fs.FileSystem.from_uri("gs://my-bucket/data/")
gcs_fs, path = fs.FileSystem.from_uri("gs://my-bucket/data/")

Local filesystem

本地文件系统

local_fs, path = fs.FileSystem.from_uri("file:///home/user/data/")
undefined
local_fs, path = fs.FileSystem.from_uri("file:///home/user/data/")
undefined

S3 Configuration

S3配置

python
import pyarrow.fs as fs
from pyarrow.fs import S3FileSystem
python
import pyarrow.fs as fs
from pyarrow.fs import S3FileSystem

Method 1: From URI with options

方法1:通过URI及配置选项

s3_fs = S3FileSystem( access_key='AKIA...', secret_key='...', session_token='...', # For temporary credentials region='us-west-2', endpoint_override='https://minio.local:9000', # S3-compatible scheme='https', proxy_options={'scheme': 'http', 'host': 'proxy.company.com', 'port': 8080}, allow_bucket_creation=True, retry_strategy=fs.AwsStandardS3RetryStrategy(max_attempts=5) )
s3_fs = S3FileSystem( access_key='AKIA...', secret_key='...', session_token='...', # 用于临时凭证 region='us-west-2', endpoint_override='https://minio.local:9000', # 兼容S3的存储服务 scheme='https', proxy_options={'scheme': 'http', 'host': 'proxy.company.com', 'port': 8080}, allow_bucket_creation=True, retry_strategy=fs.AwsStandardS3RetryStrategy(max_attempts=5) )

Method 2: From URI (reads from environment/AWS config)

方法2:通过URI(从环境变量/AWS配置读取凭证)

s3_fs, path = fs.FileSystem.from_uri("s3://my-bucket/data/")
s3_fs, path = fs.FileSystem.from_uri("s3://my-bucket/data/")

File operations (bucket/key paths, not s3:// URIs)

文件操作(使用存储桶/键路径,而非s3:// URI)

info = s3_fs.get_file_info("bucket/file.parquet") print(info.size) # File size in bytes print(info.mtime) # Modification time
info = s3_fs.get_file_info("bucket/file.parquet") print(info.size) # 文件大小(字节) print(info.mtime) # 修改时间

Open input stream

打开输入流

with s3_fs.open_input_stream("bucket/file.parquet") as f: data = f.read()
with s3_fs.open_input_stream("bucket/file.parquet") as f: data = f.read()

Open output stream for writing

打开输出流用于写入

with s3_fs.open_output_stream("bucket/output.parquet") as f: f.write(parquet_bytes)
with s3_fs.open_output_stream("bucket/output.parquet") as f: f.write(parquet_bytes)

Copy and delete

复制与删除文件

s3_fs.copy_file("bucket/src.parquet", "bucket/dst.parquet") s3_fs.delete_file("bucket/old.parquet")
undefined
s3_fs.copy_file("bucket/src.parquet", "bucket/dst.parquet") s3_fs.delete_file("bucket/old.parquet")
undefined

Working with Parquet Datasets

处理Parquet数据集

python
import pyarrow.dataset as ds
import pyarrow.fs as fs
python
import pyarrow.dataset as ds
import pyarrow.fs as fs

Create S3 filesystem

创建S3文件系统

s3_fs = fs.S3FileSystem(region='us-east-1')
s3_fs = fs.S3FileSystem(region='us-east-1')

Load partitioned dataset

加载分区数据集

dataset = ds.dataset( "bucket/dataset/", filesystem=s3_fs, format="parquet", partitioning=ds.HivePartitioning.discover() )
print(dataset.schema) print(f"Rows: {dataset.count_rows()}")
dataset = ds.dataset( "bucket/dataset/", filesystem=s3_fs, format="parquet", partitioning=ds.HivePartitioning.discover() )
print(dataset.schema) print(f"行数: {dataset.count_rows()}")

Filter pushdown (only reads relevant files)

谓词下推(仅读取相关文件)

table = dataset.to_table( filter=(ds.field("year") == 2024) & (ds.field("month") > 6), columns=["id", "value", "timestamp"] # Column pruning )
table = dataset.to_table( filter=(ds.field("year") == 2024) & (ds.field("month") > 6), columns=["id", "value", "timestamp"] # 列裁剪 )

Scan with custom options

使用自定义选项扫描

scanner = dataset.scanner( filter=ds.field("value") > 100, batch_size=65536, use_threads=True )
for batch in scanner.to_batches(): process(batch)
undefined
scanner = dataset.scanner( filter=ds.field("value") > 100, batch_size=65536, use_threads=True )
for batch in scanner.to_batches(): process(batch)
undefined

Azure Support via FSSpec Bridge

通过FSSpec桥接支持Azure

python
import adlfs
import pyarrow.fs as fs
import pyarrow.dataset as ds
python
import adlfs
import pyarrow.fs as fs
import pyarrow.dataset as ds

Create Azure filesystem via fsspec

通过fsspec创建Azure文件系统

azure_fs = adlfs.AzureBlobFileSystem( account_name="myaccount", account_key="...", tenant_id="...", client_id="...", client_secret="..." )
azure_fs = adlfs.AzureBlobFileSystem( account_name="myaccount", account_key="...", tenant_id="...", client_id="...", client_secret="..." )

Wrap in PyArrow filesystem

包装为PyArrow文件系统

pa_fs = fs.PyFileSystem(fs.FSSpecHandler(azure_fs))
pa_fs = fs.PyFileSystem(fs.FSSpecHandler(azure_fs))

Use with PyArrow dataset

与PyArrow数据集结合使用

dataset = ds.dataset( "container/path/", filesystem=pa_fs, format="parquet" )
undefined
dataset = ds.dataset( "container/path/", filesystem=pa_fs, format="parquet" )
undefined

Authentication

身份验证

See
@data-engineering-storage-authentication
for S3, GCS, Azure credential configuration.
有关S3、GCS、Azure的凭证配置,请参阅
@data-engineering-storage-authentication

When to Use PyArrow.fs

何时使用PyArrow.fs

Choose pyarrow.fs when:
  • Your pipeline is Arrow/Parquet-native
  • You need zero-copy integration with PyArrow datasets
  • Predicate pushdown and column pruning are critical
  • Working with partitioned Parquet datasets
  • You want minimal dependencies (included in PyArrow)
在以下场景选择pyarrow.fs:
  • 你的工作流是Arrow/Parquet原生的
  • 需要与PyArrow数据集进行零拷贝集成
  • 谓词下推和列裁剪至关重要
  • 处理分区Parquet数据集
  • 希望依赖最少(已包含在PyArrow中)

Performance Considerations

性能注意事项

  • Column pruning: Use
    columns=
    parameter to read only needed columns
  • Predicate pushdown: Filter at dataset level to skip reading irrelevant files
  • Batch scanning: Use
    scanner.to_batches()
    for large datasets
  • Threading: Enable
    use_threads=True
    for CPU-bound operations
  • ⚠️ For ecosystem integration (pandas, Dask, etc.), fsspec may be more convenient
  • ⚠️ For maximum async performance with many small files, consider obstore
  • 列裁剪:使用
    columns=
    参数仅读取所需列
  • 谓词下推:在数据集层面过滤,跳过无关文件的读取
  • 批量扫描:针对大型数据集使用
    scanner.to_batches()
  • 多线程:CPU密集型操作启用
    use_threads=True
  • ⚠️ 若需与生态系统(pandas、Dask等)集成,fsspec可能更便捷
  • ⚠️ 若要处理大量小文件并追求最大异步性能,可考虑obstore

Integration

集成

  • Polars:
    pl.scan_pyarrow_dataset(dataset)
    for lazy evaluation
  • PyArrow datasets: Native integration (this is the PyArrow API)
  • Delta Lake/Iceberg: Use PyArrow filesystem when constructing dataset objects

  • Polars:使用
    pl.scan_pyarrow_dataset(dataset)
    实现惰性求值
  • PyArrow数据集:原生集成(本API即为PyArrow的一部分)
  • Delta Lake/Iceberg:构建数据集对象时使用PyArrow文件系统

References

参考资料