implementing-warehouse-sources

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Implementing Data warehouse sources

实现数据仓库源

Use this skill when building or updating Data warehouse sources in
posthog/temporal/data_imports/sources/
.
本技能适用于在
posthog/temporal/data_imports/sources/
中构建或更新数据仓库源的场景。

Read first

前置阅读

Before coding, read:
  • posthog/temporal/data_imports/sources/source.template
  • posthog/temporal/data_imports/sources/README.md
  • 1 API source with
    settings.py
    + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with
    type: "resolve"
    ), also read
    posthog/temporal/data_imports/sources/common/rest_source/__init__.py
    and
    config_setup.py
    (e.g.
    process_parent_data_item
    ,
    make_parent_key_name
    ).
编码前,请阅读:
  • posthog/temporal/data_imports/sources/source.template
  • posthog/temporal/data_imports/sources/README.md
  • 一个包含
    settings.py
    和传输逻辑的API源(例如klaviyo、github)。对于依赖资源的扇出(父→子,使用
    type: "resolve"
    ),还需阅读
    posthog/temporal/data_imports/sources/common/rest_source/__init__.py
    config_setup.py
    (例如
    process_parent_data_item
    make_parent_key_name
    )。

Source architecture contract

源架构约定

For API-backed sources, use this split:
  • source.py
    : source registration, source form fields, schema list, credential validation, and pipeline handoff.
  • settings.py
    : endpoint catalog, incremental fields, primary key and partition defaults.
  • {source}.py
    : API client/auth, paginator, request params, row normalization, and
    SourceResponse
    .
This keeps endpoint behavior declarative and easy to extend.
For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in
settings.py
and route in
{source}.py
with this priority:
  1. endpoint-specific custom iterators (only when required),
  2. generic fan-out helper path,
  3. top-level endpoint path.
对于基于API的源,采用以下拆分方式:
  • source.py
    :源注册、源表单字段、 schema列表、凭证验证和流水线交接。
  • settings.py
    :端点目录、增量字段、主键和分区默认值。
  • {source}.py
    :API客户端/认证、分页器、请求参数、行规范化和
    SourceResponse
这种拆分可使端点行为保持声明式,便于扩展。
对于混合顶级端点和扇出端点的REST源,将端点元数据保留在
settings.py
中,并按以下优先级在
{source}.py
中路由:
  1. 端点特定的自定义迭代器(仅在必要时使用),
  2. 通用扇出辅助路径,
  3. 顶级端点路径。

Implementation checklist

实现检查清单

Copy this and track progress:
text
Source implementation:
- [ ] Define source fields in `get_source_config`
- [ ] Implement credential validation
- [ ] Define schemas in `get_schemas`
- [ ] Add/confirm endpoint settings (`settings.py`)
- [ ] Implement transport and paginator (`{source}.py`)
- [ ] Return correct `SourceResponse` (keys, partitioning, sort mode)
- [ ] Add non-retryable auth/permission errors
- [ ] Add source tests
- [ ] Add transport tests
- [ ] Add icon in `frontend/public/services/`
- [ ] Run `pnpm run generate:source-configs`
- [ ] Run `pnpm run schema:build`
- [ ] For Beta: set `betaSource=True` in `SourceConfig`; omit `unreleasedSource` (or set `False`) when releasing.
复制以下内容并跟踪进度:
text
源实现:
- [ ] 在`get_source_config`中定义源字段
- [ ] 实现凭证验证
- [ ] 在`get_schemas`中定义schemas
- [ ] 添加/确认端点设置(`settings.py`)
- [ ] 实现传输和分页器(`{source}.py`)
- [ ] 返回正确的`SourceResponse`(键、分区、排序模式)
- [ ] 添加不可重试的认证/权限错误处理
- [ ] 添加源测试
- [ ] 添加传输测试
- [ ] 在`frontend/public/services/`中添加图标
- [ ] 运行`pnpm run generate:source-configs`
- [ ] 运行`pnpm run schema:build`
- [ ] 若为Beta版本:在`SourceConfig`中设置`betaSource=True`;正式发布时省略`unreleasedSource`(或设为`False`)。

Required coding conventions

必选编码规范

  • Register with
    @SourceRegistry.register
    .
  • Source class should inherit
    SimpleSource[GeneratedConfig]
    unless resumable/webhook behavior is required.
  • API sources should usually return
    table_format="delta"
    in endpoint resources.
  • Use
    primary_keys
    for incremental merge safety; they are endpoint-specific (declare in
    settings.py
    , not always
    id
    ).
  • Add partitioning for new sources where possible:
    • API sources:
      partition_mode="datetime"
      with stable datetime field when available.
  • Add
    get_non_retryable_errors()
    for known permanent failures (401/403/invalid credentials).
  • Keep comments minimal and only when intent is not obvious.
  • 使用
    @SourceRegistry.register
    进行注册。
  • 源类应继承
    SimpleSource[GeneratedConfig]
    ,除非需要可恢复/ webhook行为。
  • API源通常应在端点资源中返回
    table_format="delta"
  • 使用
    primary_keys
    确保增量合并安全;它们是端点特定的(在
    settings.py
    中声明,不一定是
    id
    )。
  • 尽可能为新源添加分区:
    • API源:当有稳定的日期时间字段时,使用
      partition_mode="datetime"
  • 为已知的永久失败(401/403/无效凭证)添加
    get_non_retryable_errors()
  • 尽量减少注释,仅在意图不明确时添加。

Incremental sync guidance

增量同步指南

  • If API supports server-side time filtering, add it and map from
    db_incremental_field_last_value
    .
  • If API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe.
  • Set
    sort_mode="desc"
    only if the endpoint truly returns descending order and cannot return ascending.
  • For descending sources, make sure behavior with
    db_incremental_field_earliest_value
    is considered.
  • Default unknown endpoints to full refresh first; only enable incremental after confirming a stable filter field and API semantics.
  • Prefer immutable partition keys (
    created_at
    ,
    dateCreated
    ,
    firstSeen
    ) over mutable fields (
    updated_at
    ,
    lastSeen
    ) when both exist.
  • Confirm partition keys against response schemas, not assumptions from endpoint names.
  • 如果API支持服务器端时间过滤,添加该功能并映射自
    db_incremental_field_last_value
  • 如果API仅支持游标分页,若字段可靠仍需声明增量字段,让合并语义处理去重。
  • 仅当端点确实返回降序且无法返回升序时,设置
    sort_mode="desc"
  • 对于降序源,需考虑
    db_incremental_field_earliest_value
    的行为。
  • 未知端点默认先执行全量刷新;仅在确认有稳定的过滤字段和API语义后,再启用增量同步。
  • 当同时存在可变字段(
    updated_at
    lastSeen
    )和不可变分区键(
    created_at
    dateCreated
    firstSeen
    )时,优先选择不可变分区键。
  • 根据响应schema确认分区键,不要根据端点名称假设。

API behavior verification checklist

API行为验证清单

Before finalizing endpoint logic, verify these from docs (or reliable API examples):
  • Response shape: list vs object vs wrapped data (
    {"data": [...]}
    ).
  • Pagination contract: Link header vs body cursor vs offset/page; next-page termination signal.
  • Ordering guarantees: ascending/descending/undefined for key time fields.
  • Rate limit headers and semantics (window reset timestamp, concurrent limits).
  • Field stability: whether candidate incremental/partition fields can change over time.
If behavior is not documented, keep parsing/merge logic conservative and add a code comment documenting the uncertainty.
在确定端点逻辑之前,根据文档(或可靠的API示例)验证以下内容:
  • 响应结构:列表 vs 对象 vs 包装数据(
    {"data": [...]}
    )。
  • 分页约定:Link头 vs 主体游标 vs 偏移量/页码;下一页终止信号。
  • 排序保证:关键时间字段的升序/降序/未定义。
  • 速率限制头和语义(窗口重置时间戳、并发限制)。
  • 字段稳定性:候选增量/分区字段是否会随时间变化。
如果行为未被记录,保持解析/合并逻辑保守,并添加代码注释记录不确定性。

Endpoint inventory workflow

端点清单工作流

  • Build an endpoint inventory before expanding coverage:
    • endpoint path and auth scopes,
    • grain (org/project/child fan-out),
    • pagination style,
    • primary key shape (single/composite),
    • incremental candidate fields.
  • Keep the inventory in source-local docs (for example
    posthog/temporal/data_imports/sources/<source>/api_inventory.md
    ) so future endpoint additions stay consistent.
  • Add endpoints in phases:
    • org-level list endpoints first,
    • then project-level fan-out,
    • then child/fan-out endpoints with bounded pagination.
  • 在扩展覆盖范围之前,构建端点清单:
    • 端点路径和认证范围,
    • 粒度(组织/项目/子扇出),
    • 分页样式,
    • 主键结构(单键/复合键),
    • 候选增量字段。
  • 将清单保存在源本地文档中(例如
    posthog/temporal/data_imports/sources/<source>/api_inventory.md
    ),以便未来添加端点时保持一致性。
  • 分阶段添加端点:
    • 先添加组织级列表端点,
    • 然后添加项目级扇出端点,
    • 最后添加带有限制分页的子/扇出端点。

Top-level endpoints (org/account level)

顶级端点(组织/账户级)

Top-level endpoints are list/read endpoints that do not require parent-row expansion.
  • Declare endpoint metadata in
    settings.py
    (
    path
    ,
    primary_key
    ,
    incremental_fields
    ,
    partition_key
    ,
    sort_mode
    ).
  • Build them through a single resource config (
    get_resource(...)
    style helper) and keep transport branches minimal.
  • Keep endpoint params declarative and stable (
    limit
    , required filters).
  • Use merge write disposition only when incremental semantics are reliable; otherwise full replace is safer.
顶级端点是不需要父行扩展的列表/读取端点。
  • settings.py
    中声明端点元数据(
    path
    primary_key
    incremental_fields
    partition_key
    sort_mode
    )。
  • 通过单个资源配置(
    get_resource(...)
    风格的辅助函数)构建,并尽量减少传输分支。
  • 保持端点参数声明式且稳定(
    limit
    、必填过滤器)。
  • 仅当增量语义可靠时使用合并写入方式;否则全量替换更安全。

Pagination tips

分页技巧

  • Some APIs use cursor pagination in
    Link
    headers — check both
    rel="next"
    and any results flag the API may use.
  • When following a full cursor URL from response headers, clear request params in paginator
    update_request
    to avoid duplicate query params.
  • For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans.
  • Emit structured logs when page caps are reached (include resource name and parent identifiers) so operators can tune limits safely.
  • 某些API在
    Link
    头中使用游标分页 —— 检查
    rel="next"
    和API可能使用的任何结果标志。
  • 当从响应头中跟随完整的游标URL时,在分页器
    update_request
    中清除请求参数,避免重复的查询参数。
  • 对于父/子扇出,为每个父资源设置硬页面上限,避免无限制扫描。
  • 当达到页面上限时输出结构化日志(包含资源名称和父标识符),以便运维人员安全调整限制。

Retry and throttling strategy

重试和限流策略

  • Use a retry framework (for example tenacity) instead of manual retry loops where possible.
  • Retry transport failures and retryable status codes (
    429
    , transient
    5xx
    ).
  • Prefer server-provided rate-limit reset headers for wait calculation on
    429
    ; fall back to exponential backoff when unavailable.
  • Keep retries bounded and deterministic (
    stop_after_attempt
    ), and preserve clear terminal behavior:
    • return final response for retried status responses when useful for downstream handling, or
    • raise final exception for transport failures.
  • Keep timeout and retry settings near the top of the module for easy operator tuning.
  • 尽可能使用重试框架(例如tenacity),而非手动重试循环。
  • 重试传输失败和可重试状态码(
    429
    、临时
    5xx
    )。
  • 429
    状态下,优先使用服务器提供的速率限制重置头计算等待时间;不可用时回退到指数退避。
  • 保持重试有界且确定(
    stop_after_attempt
    ),并保留清晰的终端行为:
    • 若对下游处理有用,返回重试状态响应的最终结果,或
    • 对传输失败抛出最终异常。
  • 将超时和重试设置放在模块顶部,便于运维人员调整。

Fan-out endpoints

扇出端点

Fan-out means iterating a parent resource (for example projects) and then querying child endpoints per parent (for example project issues).
Prefer dependent resources when you have a single parent→child. Use
rest_api_resources
with a parent resource and a child that declares
type: "resolve"
for the parent field (e.g. parent slug or id). The shared infra (
rest_source/__init__.py
,
config_setup.process_parent_data_item
) paginates the parent and calls the child per parent row. Add
include_from_parent
so child rows get parent fields; they are injected as
_<parent>_<field>
via
make_parent_key_name
.
Make fan-out declarative in endpoint config. Add a fan-out config object in
settings.py
(for example
DependentEndpointConfig
) with:
  • parent_name
  • resolve_param
  • resolve_field
  • include_from_parent
  • optional parent field renames (e.g.
    id -> project_id
    )
  • optional parent endpoint params (for parent-specific defaults)
Then route all single-hop fan-out endpoints through a shared helper (for example
common/rest_source/fanout.py:build_dependent_resource
) so callers do not reimplement parent/child config assembly.
Parent field rename mapping belongs in the helper. If a helper supports declarative renames, apply the map there. Callers should not branch on whether renames exist.
Use per-endpoint pagination/selectors through fan-out helper overrides.
build_dependent_resource
supports optional endpoint overrides so you can keep single-hop fan-out declarative even when parent and child have different response shapes/pagination contracts:
  • parent_endpoint_extra
    and
    child_endpoint_extra
    : pass endpoint-level
    paginator
    and
    data_selector
    (for wrapped payloads like
    {"items": [...]}
    ).
  • page_size_param
    : override default page-size query param (
    limit
    ) for APIs that use a different name (for example
    page_size
    ).
This means you can often avoid custom iterators for single-hop fan-out even when parent and child paginate differently (e.g. Typeform forms page-number + responses cursor token).
Path pre-formatting: Child paths often have multiple placeholders (e.g. org and resource slug).
process_parent_data_item
only does
str.format()
with the resolved param. Pre-format any static placeholders with
.replace()
on the child path before passing to the resource config, so only the resolved placeholder remains and DLT does not raise
KeyError
.
When to keep a custom iterator: If fan-out requires two or more levels (e.g. parent → mid-level list → detail per mid-level), where an intermediate API call discovers values that become part of the URL, that cannot be expressed as a single parent→child in
rest_api_resources
. Implement a custom HTTP iterator for that endpoint only; reuse the same pagination/retry helpers as elsewhere.
扇出是指遍历父资源(例如项目),然后为每个父资源查询子端点(例如项目问题)。
当只有单个父→子关系时,优先使用依赖资源。 使用
rest_api_resources
,包含一个父资源和一个为父字段声明
type: "resolve"
的子资源(例如父slug或id)。共享基础设施(
rest_source/__init__.py
config_setup.process_parent_data_item
)会分页父资源,并为每个父行调用子资源。添加
include_from_parent
,使子行获取父字段;它们会通过
make_parent_key_name
被注入为
_<parent>_<field>
在端点配置中声明式定义扇出。
settings.py
中添加扇出配置对象(例如
DependentEndpointConfig
),包含:
  • parent_name
  • resolve_param
  • resolve_field
  • include_from_parent
  • 可选的父字段重命名(例如
    id -> project_id
  • 可选的父端点参数(针对父资源的特定默认值)
然后通过共享辅助函数(例如
common/rest_source/fanout.py:build_dependent_resource
)路由所有单跳扇出端点,这样调用者无需重新实现父/子配置组装。
父字段重命名映射应放在辅助函数中。 如果辅助函数支持声明式重命名,在此处应用映射。调用者不应根据是否存在重命名进行分支处理。
通过扇出辅助函数的覆盖项使用端点特定的分页/选择器。
build_dependent_resource
支持可选的端点覆盖,因此即使父和子具有不同的响应结构/分页约定,也能保持单跳扇出的声明式:
  • parent_endpoint_extra
    child_endpoint_extra
    :传递端点级别的
    paginator
    data_selector
    (用于包装的负载,例如
    {"items": [...]}
    )。
  • page_size_param
    :覆盖默认的页面大小查询参数(
    limit
    ),适用于使用不同名称的API(例如
    page_size
    )。
这意味着即使父和子的分页方式不同(例如Typeform表单使用页码 + 响应使用游标令牌),通常也可以避免为单跳扇出实现自定义迭代器。
路径预格式化: 子路径通常有多个占位符(例如组织和资源slug)。
process_parent_data_item
仅对已解析的参数执行
str.format()
。在传递给资源配置之前,使用
.replace()
预格式化子路径中的任何静态占位符,这样仅保留已解析的占位符,避免DLT抛出
KeyError
何时保留自定义迭代器: 如果扇出需要两级或更多级别(例如父→中间级列表→每个中间级的详情),其中中间API调用发现的值会成为URL的一部分,无法在
rest_api_resources
中表示为单个父→子关系。仅为该端点实现自定义HTTP迭代器;重用其他地方的相同分页/重试辅助函数。

Testing expectations

测试要求

Add at least two test modules:
  • tests/test_<source>_source.py
    :
    • source_type
    • get_source_config
      fields and labels
    • get_schemas
      outputs
    • validate_credentials
      success/failure
    • source_for_pipeline
      argument plumbing
  • tests/test_<source>.py
    :
    • paginator behavior from API response headers/body
    • resource generation for incremental vs non-incremental
    • endpoint-specific primary key mapping
    • credential validation status mapping
    • mapper/filter helpers if present
    • fan-out endpoint row format assertions (dict shape + parent identifiers)
    • for dependent-resource fan-out: mock
      rest_api_resources
      , pass rows with
      _<parent>_<field>
      keys to exercise parent-field injection and rename behavior
    • expected return schema checks for each declared endpoint in
      settings.py
Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.
Use parameterized tests for status codes and edge cases.
至少添加两个测试模块:
  • tests/test_<source>_source.py
    • source_type
    • get_source_config
      字段和标签
    • get_schemas
      输出
    • validate_credentials
      成功/失败情况
    • source_for_pipeline
      参数传递
  • tests/test_<source>.py
    • API响应头/主体的分页器行为
    • 增量与非增量的资源生成
    • 端点特定的主键映射
    • 凭证验证状态映射
    • 映射器/过滤器辅助函数(如果存在)
    • 扇出端点行格式断言(字典结构 + 父标识符)
    • 对于依赖资源扇出:模拟
      rest_api_resources
      ,传递带有
      _<parent>_<field>
      键的行,以验证父字段注入和重命名行为
    • settings.py
      中每个声明的端点进行预期返回schema检查
优先选择行为测试而非配置结构测试。除非是为了保护无法通过输出行为断言的已知回归,否则避免对内部配置字典结构进行脆弱的断言。
对状态码和边缘情况使用参数化测试。

Validation and generation workflow

验证和生成工作流

After changing source fields, run the generation commands from the checklist and targeted tests for the new source.
更改源字段后,运行检查清单中的生成命令,并针对新源运行定向测试。

Common pitfalls

常见陷阱

  • Source not visible in wizard: source not registered/imported, or schema not rebuilt.
  • Generated config class still empty: forgot
    generate:source-configs
    after updating fields.
  • Incremental sync misbehaving: wrong field name/type or wrong sort assumptions.
  • Endless retries for bad credentials: missing
    get_non_retryable_errors
    .
  • Dependent resource path
    KeyError
    : pre-format static path placeholders (see Fan-out).
  • Silent truncation risk: page caps hit without logs/metrics.
  • Drift from refactors: unused function params/helpers left behind after endpoint behavior changes.
  • Type drift in endpoint config dicts: use source typing aliases (
    Endpoint
    ,
    ClientConfig
    ,
    IncrementalConfig
    ) to keep static checks precise.
  • 源在向导中不可见:源未注册/导入,或schema未重建。
  • 生成的配置类仍为空:更新字段后忘记运行
    generate:source-configs
  • 增量同步行为异常:字段名称/类型错误或排序假设错误。
  • 错误凭证导致无限重试:缺少
    get_non_retryable_errors
  • 依赖资源路径
    KeyError
    :未预格式化静态路径占位符(参见扇出部分)。
  • 静默截断风险:达到页面上限但无日志/指标。
  • 重构导致偏离:端点行为更改后留下未使用的函数参数/辅助函数。
  • 端点配置字典中的类型漂移:使用源类型别名(
    Endpoint
    ClientConfig
    IncrementalConfig
    )保持静态检查精确。