data-engineering-storage-remote-access-libraries-pyarrow-fs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePyArrow.fs: Native Arrow Filesystems
PyArrow.fs:原生Arrow文件系统
PyArrow provides its own filesystem abstraction optimized for Arrow/Parquet workflows with zero-copy integration.
PyArrow提供了专属的文件系统抽象层,针对Arrow/Parquet工作流优化,支持零拷贝集成。
Installation
安装
bash
undefinedbash
undefinedBundled with PyArrow - no extra deps
随PyArrow捆绑提供 - 无需额外依赖
pip install pyarrow
undefinedpip install pyarrow
undefinedBasic Usage
基础用法
python
import pyarrow.fs as fs
from pyarrow import parquet as pqpython
import pyarrow.fs as fs
from pyarrow import parquet as pqFrom URI - auto-detects filesystem type
通过URI自动检测文件系统类型
s3_fs, path = fs.FileSystem.from_uri("s3://bucket/path/to/data/")
print(type(s3_fs)) # <class 'pyarrow._fs.S3FileSystem'>
print(path) # 'path/to/data/'
s3_fs, path = fs.FileSystem.from_uri("s3://bucket/path/to/data/")
print(type(s3_fs)) # <class 'pyarrow._fs.S3FileSystem'>
print(path) # 'path/to/data/'
GCS via URI
通过URI使用GCS
gcs_fs, path = fs.FileSystem.from_uri("gs://my-bucket/data/")
gcs_fs, path = fs.FileSystem.from_uri("gs://my-bucket/data/")
Local filesystem
本地文件系统
local_fs, path = fs.FileSystem.from_uri("file:///home/user/data/")
undefinedlocal_fs, path = fs.FileSystem.from_uri("file:///home/user/data/")
undefinedS3 Configuration
S3配置
python
import pyarrow.fs as fs
from pyarrow.fs import S3FileSystempython
import pyarrow.fs as fs
from pyarrow.fs import S3FileSystemMethod 1: From URI with options
方法1:通过URI及配置选项
s3_fs = S3FileSystem(
access_key='AKIA...',
secret_key='...',
session_token='...', # For temporary credentials
region='us-west-2',
endpoint_override='https://minio.local:9000', # S3-compatible
scheme='https',
proxy_options={'scheme': 'http', 'host': 'proxy.company.com', 'port': 8080},
allow_bucket_creation=True,
retry_strategy=fs.AwsStandardS3RetryStrategy(max_attempts=5)
)
s3_fs = S3FileSystem(
access_key='AKIA...',
secret_key='...',
session_token='...', # 用于临时凭证
region='us-west-2',
endpoint_override='https://minio.local:9000', # 兼容S3的存储服务
scheme='https',
proxy_options={'scheme': 'http', 'host': 'proxy.company.com', 'port': 8080},
allow_bucket_creation=True,
retry_strategy=fs.AwsStandardS3RetryStrategy(max_attempts=5)
)
Method 2: From URI (reads from environment/AWS config)
方法2:通过URI(从环境变量/AWS配置读取凭证)
s3_fs, path = fs.FileSystem.from_uri("s3://my-bucket/data/")
s3_fs, path = fs.FileSystem.from_uri("s3://my-bucket/data/")
File operations (bucket/key paths, not s3:// URIs)
文件操作(使用存储桶/键路径,而非s3:// URI)
info = s3_fs.get_file_info("bucket/file.parquet")
print(info.size) # File size in bytes
print(info.mtime) # Modification time
info = s3_fs.get_file_info("bucket/file.parquet")
print(info.size) # 文件大小(字节)
print(info.mtime) # 修改时间
Open input stream
打开输入流
with s3_fs.open_input_stream("bucket/file.parquet") as f:
data = f.read()
with s3_fs.open_input_stream("bucket/file.parquet") as f:
data = f.read()
Open output stream for writing
打开输出流用于写入
with s3_fs.open_output_stream("bucket/output.parquet") as f:
f.write(parquet_bytes)
with s3_fs.open_output_stream("bucket/output.parquet") as f:
f.write(parquet_bytes)
Copy and delete
复制与删除文件
s3_fs.copy_file("bucket/src.parquet", "bucket/dst.parquet")
s3_fs.delete_file("bucket/old.parquet")
undefineds3_fs.copy_file("bucket/src.parquet", "bucket/dst.parquet")
s3_fs.delete_file("bucket/old.parquet")
undefinedWorking with Parquet Datasets
处理Parquet数据集
python
import pyarrow.dataset as ds
import pyarrow.fs as fspython
import pyarrow.dataset as ds
import pyarrow.fs as fsCreate S3 filesystem
创建S3文件系统
s3_fs = fs.S3FileSystem(region='us-east-1')
s3_fs = fs.S3FileSystem(region='us-east-1')
Load partitioned dataset
加载分区数据集
dataset = ds.dataset(
"bucket/dataset/",
filesystem=s3_fs,
format="parquet",
partitioning=ds.HivePartitioning.discover()
)
print(dataset.schema)
print(f"Rows: {dataset.count_rows()}")
dataset = ds.dataset(
"bucket/dataset/",
filesystem=s3_fs,
format="parquet",
partitioning=ds.HivePartitioning.discover()
)
print(dataset.schema)
print(f"行数: {dataset.count_rows()}")
Filter pushdown (only reads relevant files)
谓词下推(仅读取相关文件)
table = dataset.to_table(
filter=(ds.field("year") == 2024) & (ds.field("month") > 6),
columns=["id", "value", "timestamp"] # Column pruning
)
table = dataset.to_table(
filter=(ds.field("year") == 2024) & (ds.field("month") > 6),
columns=["id", "value", "timestamp"] # 列裁剪
)
Scan with custom options
使用自定义选项扫描
scanner = dataset.scanner(
filter=ds.field("value") > 100,
batch_size=65536,
use_threads=True
)
for batch in scanner.to_batches():
process(batch)
undefinedscanner = dataset.scanner(
filter=ds.field("value") > 100,
batch_size=65536,
use_threads=True
)
for batch in scanner.to_batches():
process(batch)
undefinedAzure Support via FSSpec Bridge
通过FSSpec桥接支持Azure
python
import adlfs
import pyarrow.fs as fs
import pyarrow.dataset as dspython
import adlfs
import pyarrow.fs as fs
import pyarrow.dataset as dsCreate Azure filesystem via fsspec
通过fsspec创建Azure文件系统
azure_fs = adlfs.AzureBlobFileSystem(
account_name="myaccount",
account_key="...",
tenant_id="...",
client_id="...",
client_secret="..."
)
azure_fs = adlfs.AzureBlobFileSystem(
account_name="myaccount",
account_key="...",
tenant_id="...",
client_id="...",
client_secret="..."
)
Wrap in PyArrow filesystem
包装为PyArrow文件系统
pa_fs = fs.PyFileSystem(fs.FSSpecHandler(azure_fs))
pa_fs = fs.PyFileSystem(fs.FSSpecHandler(azure_fs))
Use with PyArrow dataset
与PyArrow数据集结合使用
dataset = ds.dataset(
"container/path/",
filesystem=pa_fs,
format="parquet"
)
undefineddataset = ds.dataset(
"container/path/",
filesystem=pa_fs,
format="parquet"
)
undefinedAuthentication
身份验证
See for S3, GCS, Azure credential configuration.
@data-engineering-storage-authentication有关S3、GCS、Azure的凭证配置,请参阅。
@data-engineering-storage-authenticationWhen to Use PyArrow.fs
何时使用PyArrow.fs
Choose pyarrow.fs when:
- Your pipeline is Arrow/Parquet-native
- You need zero-copy integration with PyArrow datasets
- Predicate pushdown and column pruning are critical
- Working with partitioned Parquet datasets
- You want minimal dependencies (included in PyArrow)
在以下场景选择pyarrow.fs:
- 你的工作流是Arrow/Parquet原生的
- 需要与PyArrow数据集进行零拷贝集成
- 谓词下推和列裁剪至关重要
- 处理分区Parquet数据集
- 希望依赖最少(已包含在PyArrow中)
Performance Considerations
性能注意事项
- ✅ Column pruning: Use parameter to read only needed columns
columns= - ✅ Predicate pushdown: Filter at dataset level to skip reading irrelevant files
- ✅ Batch scanning: Use for large datasets
scanner.to_batches() - ✅ Threading: Enable for CPU-bound operations
use_threads=True - ⚠️ For ecosystem integration (pandas, Dask, etc.), fsspec may be more convenient
- ⚠️ For maximum async performance with many small files, consider obstore
- ✅ 列裁剪:使用参数仅读取所需列
columns= - ✅ 谓词下推:在数据集层面过滤,跳过无关文件的读取
- ✅ 批量扫描:针对大型数据集使用
scanner.to_batches() - ✅ 多线程:CPU密集型操作启用
use_threads=True - ⚠️ 若需与生态系统(pandas、Dask等)集成,fsspec可能更便捷
- ⚠️ 若要处理大量小文件并追求最大异步性能,可考虑obstore
Integration
集成
- Polars: for lazy evaluation
pl.scan_pyarrow_dataset(dataset) - PyArrow datasets: Native integration (this is the PyArrow API)
- Delta Lake/Iceberg: Use PyArrow filesystem when constructing dataset objects
- Polars:使用实现惰性求值
pl.scan_pyarrow_dataset(dataset) - PyArrow数据集:原生集成(本API即为PyArrow的一部分)
- Delta Lake/Iceberg:构建数据集对象时使用PyArrow文件系统