python-bigquery-sdk
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython BigQuery SDK
Python BigQuery SDK
Provides workflows and best practices for the Python client library (v3.x) covering client setup, querying, schema definition, data loading, and result consumption.
google-cloud-bigquery本文提供 Python客户端库(v3.x版本)的工作流与最佳实践,涵盖客户端配置、数据查询、Schema定义、数据加载以及结果处理等内容。
google-cloud-bigqueryInstallation
安装
bash
pip install google-cloud-bigquerybash
pip install google-cloud-bigqueryOptional extras for Arrow/pandas integration
可选:安装用于Arrow/pandas集成的扩展依赖
pip install "google-cloud-bigquery[pandas,pyarrow]"
undefinedpip install "google-cloud-bigquery[pandas,pyarrow]"
undefinedClient Initialisation
客户端初始化
Instantiate once per process and reuse it. The client is thread-safe.
Clientpython
from google.cloud import bigquery每个进程只需实例化一次并复用,该客户端支持线程安全。
Clientpython
from google.cloud import bigqueryPicks up credentials from GOOGLE_APPLICATION_CREDENTIALS or ADC
从GOOGLE_APPLICATION_CREDENTIALS环境变量或ADC自动获取凭证
client = bigquery.Client(project="my-project")
client = bigquery.Client(project="my-project")
Explicit project + credentials
显式指定项目与凭证
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("key.json")
client = bigquery.Client(project="my-project", credentials=credentials)
**Key rules:**
- Never create a `Client` inside a per-request or per-row function.
- Always set `project` explicitly in production; avoid relying on environment inference.
- Close the client with `client.close()` or use it as a context manager when appropriate.from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("key.json")
client = bigquery.Client(project="my-project", credentials=credentials)
**关键规则:**
- 切勿在每次请求或每一行数据的处理函数内创建`Client`实例。
- 生产环境中务必显式设置`project`参数,避免依赖环境变量自动推断。
- 适时使用`client.close()`关闭客户端,或在合适场景下将其作为上下文管理器使用。Running Queries
执行查询
Simple query (blocking)
简单查询(阻塞式)
python
query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query) # Returns RowIterator directly (v3+)
for row in rows:
print(row["name"], row.age)query_and_waitclient.query(...).result()python
query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query) # 直接返回RowIterator(v3+版本)
for row in rows:
print(row["name"], row.age)对于短时间交互式查询,是优于传统模式的推荐方案。
query_and_waitclient.query(...).result()Parameterised queries (mandatory for untrusted input)
参数化查询(处理不可信输入时的强制要求)
Always use query parameters — never string-format user input into SQL.
python
from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig
config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("min_age", "INT64", 18),
ScalarQueryParameter("country", "STRING", "US"),
]
)
sql = """
SELECT name FROM `project.dataset.users`
WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)Parameter types map to BigQuery SQL types: , , , , , , .
"STRING""INT64""FLOAT64""BOOL""TIMESTAMP""DATE""BYTES"Use for list inputs:
ArrayQueryParameterpython
from google.cloud.bigquery import ArrayQueryParameter
config = QueryJobConfig(
query_parameters=[
ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
]
)务必使用查询参数——绝不要将用户输入直接通过字符串格式化拼接到SQL中。
python
from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig
config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("min_age", "INT64", 18),
ScalarQueryParameter("country", "STRING", "US"),
]
)
sql = """
SELECT name FROM `project.dataset.users`
WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)参数类型与BigQuery SQL类型对应:、、、、、、。
"STRING""INT64""FLOAT64""BOOL""TIMESTAMP""DATE""BYTES"列表类型输入可使用:
ArrayQueryParameterpython
from google.cloud.bigquery import ArrayQueryParameter
config = QueryJobConfig(
query_parameters=[
ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
]
)Asynchronous / long-running queries
异步/长时间运行的查询
python
job = client.query(sql, job_config=config) # Returns QueryJob immediatelypython
job = client.query(sql, job_config=config) # 立即返回QueryJob对象... do other work ...
... 执行其他任务 ...
rows = job.result(timeout=300) # Block until complete or timeout
print(f"Bytes processed: {job.total_bytes_processed}")
Check `job.state` (`"RUNNING"`, `"DONE"`) and `job.error_result` before consuming results.rows = job.result(timeout=300) # 阻塞直到任务完成或超时
print(f"处理数据量:{job.total_bytes_processed}")
在处理结果前,请检查`job.state`(可选值:`"RUNNING"`、`"DONE"`)与`job.error_result`。Dry-run (cost estimation)
预运行(成本估算)
python
dry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"Estimated bytes: {job.total_bytes_processed}")python
dry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"预估处理数据量:{job.total_bytes_processed}")Consuming Results
结果处理
Iterate rows
遍历行数据
python
for row in rows:
value = row["column_name"] # by name
value = row[0] # by position
value = row.column_name # attribute accesspython
for row in rows:
value = row["column_name"] # 通过列名获取
value = row[0] # 通过索引位置获取
value = row.column_name # 通过属性访问获取Convert to pandas DataFrame
转换为pandas DataFrame
python
df = rows.to_dataframe() # requires pandas + pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})python
df = rows.to_dataframe() # 需要安装pandas与pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})Convert to Arrow Table
转换为Arrow Table
python
table = rows.to_arrow()to_arrow()to_dataframe()python
table = rows.to_arrow()对于大型结果集,比速度更快;如需DataFormat可在转换后再做处理。
to_arrow()to_dataframe()Page size control
分页大小控制
python
rows = client.query_and_wait(sql, max_results=1000) # cap result rowspython
rows = client.query_and_wait(sql, max_results=1000) # 限制返回行数Schema Definition
Schema定义
Define schemas explicitly — never rely on autodetect in production.
python
schema = [
bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
bigquery.SchemaField(
"address",
"RECORD",
mode="NULLABLE",
fields=[
bigquery.SchemaField("city", "STRING"),
bigquery.SchemaField("postcode", "STRING"),
],
),
]| Mode | Meaning |
|---|---|
| NOT NULL; value must be present |
| Default; value may be NULL |
| Array of the given type |
Standard SQL types: , , /, /, , , /, , , , , , , /.
STRINGBYTESINTEGERINT64FLOATFLOAT64NUMERICBIGNUMERICBOOLEANBOOLTIMESTAMPDATETIMEDATETIMEGEOGRAPHYJSONRECORDSTRUCT生产环境中务必显式定义Schema——切勿依赖自动检测功能。
python
schema = [
bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
bigquery.SchemaField(
"address",
"RECORD",
mode="NULLABLE",
fields=[
bigquery.SchemaField("city", "STRING"),
bigquery.SchemaField("postcode", "STRING"),
],
),
]| 模式 | 说明 |
|---|---|
| 非空约束;必须提供值 |
| 默认值;值可为空 |
| 对应数组类型 |
标准SQL类型:、、/、/、、、/、、、、、、、/。
STRINGBYTESINTEGERINT64FLOATFLOAT64NUMERICBIGNUMERICBOOLEANBOOLTIMESTAMPDATETIMEDATETIMEGEOGRAPHYJSONRECORDSTRUCTCreating Tables
创建表
python
dataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)python
dataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)Time partitioning
按时间分区
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at",
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days
)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at",
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7天
)
Clustering
聚类配置
table.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)
undefinedtable.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)
undefinedLoading Data
数据加载
From a local file
从本地文件加载
python
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result() # Wait for completionpython
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result() # 等待任务完成From Google Cloud Storage
从Google Cloud Storage加载
python
uri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()python
uri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()From a pandas DataFrame
从pandas DataFrame加载
python
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()python
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()Write dispositions
写入策略
| Value | Behaviour |
|---|---|
| Replace all existing rows |
| Add rows to existing data |
| Fail if table already contains data |
| 取值 | 行为 |
|---|---|
| 替换表中所有现有数据 |
| 向表中追加数据 |
| 若表中已有数据则任务失败 |
Inserting Rows (Streaming)
插入行数据(流式传输)
Use the Storage Write API via for high-throughput streaming. For simple cases:
google-cloud-bigquery-storagepython
errors = client.insert_rows_json(table_ref, [
{"user_id": 1, "email": "a@example.com"},
{"user_id": 2, "email": "b@example.com"},
])
if errors:
raise RuntimeError(f"Streaming insert errors: {errors}")Caution: does not guarantee exactly-once delivery and has per-row cost. Prefer batch load jobs for bulk ingestion.
insert_rows_json对于高吞吐量的流式传输,推荐通过使用Storage Write API。简单场景下可使用以下方式:
google-cloud-bigquery-storagepython
errors = client.insert_rows_json(table_ref, [
{"user_id": 1, "email": "a@example.com"},
{"user_id": 2, "email": "b@example.com"},
])
if errors:
raise RuntimeError(f"流式插入错误:{errors}")注意:不保证精确一次投递,且按行计费。批量数据导入时优先使用批量加载任务。
insert_rows_jsonError Handling
错误处理
python
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound
try:
rows = client.query_and_wait(sql)
except BadRequest as exc:
# SQL syntax / schema errors
print(f"Query error: {exc.message}")
except NotFound as exc:
print(f"Table or dataset not found: {exc}")
except GoogleCloudError as exc:
print(f"API error: {exc}")Always inspect after async jobs:
job.errorspython
job = client.query(sql)
job.result()
if job.errors:
for err in job.errors:
print(err["message"], err.get("reason"))python
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound
try:
rows = client.query_and_wait(sql)
except BadRequest as exc:
# SQL语法/Schema错误
print(f"查询错误:{exc.message}")
except NotFound as exc:
print(f"表或数据集不存在:{exc}")
except GoogleCloudError as exc:
print(f"API调用错误:{exc}")始终在异步任务完成后检查:
job.errorspython
job = client.query(sql)
job.result()
if job.errors:
for err in job.errors:
print(err["message"], err.get("reason"))Quick Reference: Key Classes
快速参考:关键类
| Class | Module | Purpose |
|---|---|---|
| | Entry point for all API calls |
| | Query execution options |
| | Load job options |
| | Column definition |
| | Table resource |
| | Dataset resource |
| | Partitioning config |
| | Single-value param |
| | Array param |
| | Struct param |
| | Overwrite vs append |
| | Input file format |
| 类名 | 模块 | 用途 |
|---|---|---|
| | 所有API调用的入口类 |
| | 查询执行配置选项 |
| | 加载任务配置选项 |
| | 列定义类 |
| | 表资源类 |
| | 数据集资源类 |
| | 分区配置类 |
| | 单值参数类 |
| | 数组参数类 |
| | 结构体参数类 |
| | 写入策略枚举 |
| | 输入文件格式枚举 |
Additional Resources
附加资源
Reference Files
参考文档
For deeper coverage consult:
- — Schema evolution, partitioned table queries, export jobs, retry configuration, DB-API usage, and performance patterns
references/advanced-patterns.md
如需深入了解,请查阅:
- —— Schema演进、分区表查询、导出任务、重试配置、DB-API使用以及性能优化模式
references/advanced-patterns.md