Implementing Data warehouse sources
Use this skill when building or updating Data warehouse sources in
posthog/temporal/data_imports/sources/
.
Read first
Before coding, read:
posthog/temporal/data_imports/sources/source.template
posthog/temporal/data_imports/sources/README.md
- 1 API source with + transport logic (e.g. klaviyo, github). For dependent-resource fan-out (parent→child with ), also read
posthog/temporal/data_imports/sources/common/rest_source/__init__.py
and (e.g. , ).
Source architecture contract
For API-backed sources, use this split:
- : source registration, source form fields, schema list, credential validation, and pipeline handoff.
- : endpoint catalog, incremental fields, primary key and partition defaults.
- : API client/auth, paginator, request params, row normalization, and .
This keeps endpoint behavior declarative and easy to extend.
For REST sources that mix top-level and fan-out endpoints, keep endpoint metadata in
and route in
with this priority:
- endpoint-specific custom iterators (only when required),
- generic fan-out helper path,
- top-level endpoint path.
Implementation checklist
Copy this and track progress:
text
Source implementation:
- [ ] Define source fields in `get_source_config`
- [ ] Implement credential validation
- [ ] Define schemas in `get_schemas`
- [ ] Add/confirm endpoint settings (`settings.py`)
- [ ] Implement transport and paginator (`{source}.py`)
- [ ] Return correct `SourceResponse` (keys, partitioning, sort mode)
- [ ] Add non-retryable auth/permission errors
- [ ] Add source tests
- [ ] Add transport tests
- [ ] Add icon in `frontend/public/services/`
- [ ] Run `pnpm run generate:source-configs`
- [ ] Run `pnpm run schema:build`
- [ ] For Beta: set `betaSource=True` in `SourceConfig`; omit `unreleasedSource` (or set `False`) when releasing.
Required coding conventions
- Register with .
- Source class should inherit
SimpleSource[GeneratedConfig]
unless resumable/webhook behavior is required.
- API sources should usually return in endpoint resources.
- Use for incremental merge safety; they are endpoint-specific (declare in , not always ).
- Add partitioning for new sources where possible:
- API sources:
partition_mode="datetime"
with stable datetime field when available.
- Add
get_non_retryable_errors()
for known permanent failures (401/403/invalid credentials).
- Keep comments minimal and only when intent is not obvious.
Incremental sync guidance
- If API supports server-side time filtering, add it and map from
db_incremental_field_last_value
.
- If API only supports cursor pagination, still declare incremental fields if reliable and let merge semantics dedupe.
- Set only if the endpoint truly returns descending order and cannot return ascending.
- For descending sources, make sure behavior with
db_incremental_field_earliest_value
is considered.
- Default unknown endpoints to full refresh first; only enable incremental after confirming a stable filter field and API semantics.
- Prefer immutable partition keys (, , ) over mutable fields (, ) when both exist.
- Confirm partition keys against response schemas, not assumptions from endpoint names.
API behavior verification checklist
Before finalizing endpoint logic, verify these from docs (or reliable API examples):
- Response shape: list vs object vs wrapped data ().
- Pagination contract: Link header vs body cursor vs offset/page; next-page termination signal.
- Ordering guarantees: ascending/descending/undefined for key time fields.
- Rate limit headers and semantics (window reset timestamp, concurrent limits).
- Field stability: whether candidate incremental/partition fields can change over time.
If behavior is not documented, keep parsing/merge logic conservative and add a code comment documenting the uncertainty.
Endpoint inventory workflow
- Build an endpoint inventory before expanding coverage:
- endpoint path and auth scopes,
- grain (org/project/child fan-out),
- pagination style,
- primary key shape (single/composite),
- incremental candidate fields.
- Keep the inventory in source-local docs (for example
posthog/temporal/data_imports/sources/<source>/api_inventory.md
) so future endpoint additions stay consistent.
- Add endpoints in phases:
- org-level list endpoints first,
- then project-level fan-out,
- then child/fan-out endpoints with bounded pagination.
Top-level endpoints (org/account level)
Top-level endpoints are list/read endpoints that do not require parent-row expansion.
- Declare endpoint metadata in (, , , , ).
- Build them through a single resource config ( style helper) and keep transport branches minimal.
- Keep endpoint params declarative and stable (, required filters).
- Use merge write disposition only when incremental semantics are reliable; otherwise full replace is safer.
Pagination tips
- Some APIs use cursor pagination in headers — check both and any results flag the API may use.
- When following a full cursor URL from response headers, clear request params in paginator to avoid duplicate query params.
- For parent/child fan-out, keep hard page caps per parent resource to avoid unbounded scans.
- Emit structured logs when page caps are reached (include resource name and parent identifiers) so operators can tune limits safely.
Retry and throttling strategy
- Use a retry framework (for example tenacity) instead of manual retry loops where possible.
- Retry transport failures and retryable status codes (, transient ).
- Prefer server-provided rate-limit reset headers for wait calculation on ; fall back to exponential backoff when unavailable.
- Keep retries bounded and deterministic (), and preserve clear terminal behavior:
- return final response for retried status responses when useful for downstream handling, or
- raise final exception for transport failures.
- Keep timeout and retry settings near the top of the module for easy operator tuning.
Fan-out endpoints
Fan-out means iterating a parent resource (for example projects) and then querying child endpoints per parent (for example project issues).
Prefer dependent resources when you have a single parent→child. Use
with a parent resource and a child that declares
for the parent field (e.g. parent slug or id). The shared infra (
,
config_setup.process_parent_data_item
) paginates the parent and calls the child per parent row. Add
so child rows get parent fields; they are injected as
via
.
Make fan-out declarative in endpoint config. Add a fan-out config object in
(for example
) with:
- optional parent field renames (e.g. )
- optional parent endpoint params (for parent-specific defaults)
Then route all single-hop fan-out endpoints through a shared helper (for example
common/rest_source/fanout.py:build_dependent_resource
) so callers do not reimplement parent/child config assembly.
Parent field rename mapping belongs in the helper. If a helper supports declarative renames, apply the map there. Callers should not branch on whether renames exist.
Use per-endpoint pagination/selectors through fan-out helper overrides. supports optional endpoint overrides so you can keep single-hop fan-out declarative even when parent and child have different response shapes/pagination contracts:
- and : pass endpoint-level and (for wrapped payloads like ).
- : override default page-size query param () for APIs that use a different name (for example ).
This means you can often avoid custom iterators for single-hop fan-out even when parent and child paginate differently (e.g. Typeform forms page-number + responses cursor token).
Path pre-formatting: Child paths often have multiple placeholders (e.g. org and resource slug).
only does
with the
resolved param. Pre-format any static placeholders with
on the child path before passing to the resource config, so only the resolved placeholder remains and DLT does not raise
.
When to keep a custom iterator: If fan-out requires two or more levels (e.g. parent → mid-level list → detail per mid-level), where an intermediate API call discovers values that become part of the URL, that cannot be expressed as a single parent→child in
. Implement a custom HTTP iterator for that endpoint only; reuse the same pagination/retry helpers as elsewhere.
Testing expectations
Add at least two test modules:
tests/test_<source>_source.py
:
- fields and labels
- outputs
- success/failure
- argument plumbing
- :
- paginator behavior from API response headers/body
- resource generation for incremental vs non-incremental
- endpoint-specific primary key mapping
- credential validation status mapping
- mapper/filter helpers if present
- fan-out endpoint row format assertions (dict shape + parent identifiers)
- for dependent-resource fan-out: mock , pass rows with keys to exercise parent-field injection and rename behavior
- expected return schema checks for each declared endpoint in
Prefer behavior tests over config-shape tests. Avoid brittle assertions on internal config dict structure unless they protect a known regression that cannot be asserted via output behavior.
Use parameterized tests for status codes and edge cases.
Validation and generation workflow
After changing source fields, run the generation commands from the checklist and targeted tests for the new source.
Common pitfalls
- Source not visible in wizard: source not registered/imported, or schema not rebuilt.
- Generated config class still empty: forgot after updating fields.
- Incremental sync misbehaving: wrong field name/type or wrong sort assumptions.
- Endless retries for bad credentials: missing .
- Dependent resource path : pre-format static path placeholders (see Fan-out).
- Silent truncation risk: page caps hit without logs/metrics.
- Drift from refactors: unused function params/helpers left behind after endpoint behavior changes.
- Type drift in endpoint config dicts: use source typing aliases (, , ) to keep static checks precise.