Loading...
Loading...
Compare original and translation side by side
START: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decoratorsSTART: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decoratorsdlt init <source_name> <destination_name>dlt init salesforce bigquerydlt init github duckdbdlt init stripe snowflakedlt init <source_name> <destination_name>dlt init salesforce bigquerydlt init github duckdbdlt init stripe snowflakedeclarative_rest_pipeline.pycustom_python_pipeline.pydeclarative_rest_pipeline.pycustom_python_pipeline.pypython scripts/install_packages.py --destination <destination_name>pip install "dlt[<destination>,workspace]"bigquerysnowflakedlt[workspace]workspacedlt pipeline <name> showpython scripts/install_packages.py --destination <destination_name>pip install "dlt[<destination>,workspace]"bigquerysnowflakedlt[workspace]workspacedlt pipeline <name> show.dlt/secrets.toml[sources.<source_name>].dlt/secrets.toml[sources.<source_name>]
Use the template: [assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)
**Important**: Remind user to add `.dlt/secrets.toml` to `.gitignore`!
**Note for DuckDB**: DuckDB doesn't require credentials in secrets.toml. Just specify the database file path in the pipeline or config.toml.
使用模板:[assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)
**重要提示**:提醒用户将`.dlt/secrets.toml`添加到`.gitignore`中!
**DuckDB注意事项**:DuckDB无需在secrets.toml中配置凭证,只需在管道或config.toml中指定数据库文件路径即可。.dlt/config.toml[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US".dlt/config.toml[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US".with_resources().apply_hints()max_table_nestingtable_name@dlt.source@dlt.resource.with_resources().apply_hints()max_table_nestingtable_name@dlt.source@dlt.resourcemergemergepython <pipeline_file>.pypython <pipeline_file>.pydlt[workspace]install_packages.pydlt pipeline <pipeline_name> showpython scripts/open_dashboard.py <pipeline_name>dlt[workspace]install_packages.pydlt pipeline <pipeline_name> showpython scripts/open_dashboard.py <pipeline_name>from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)undefinedundefinedfrom dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}dlt.attach()import duckdb
import dlt
from dlt.sources.rest_api import rest_api_sourcedlt.attach()import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source
See: [references/rest-api-source.md](references/rest-api-source.md) (Non-REST Endpoint Resources, Query/Path Params, Single-Object Responses, include_from_parent).
参考:[references/rest-api-source.md](references/rest-api-source.md)(非REST端点资源、查询/路径参数、单对象响应、include_from_parent)。.dlt/secrets.toml.gitignoreappendmergereplace.dlt/secrets.toml.gitignoreappendmergereplace"auth": {"type": "oauth2_client_credentials", ...}dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentialspaginate()"auth": {"type": "oauth2_client_credentials", ...}dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentialspaginate()workspacedlt pipeline <name> showworkspacedlt pipeline <name> show.dlt/secrets.toml.gitignore.dlt/secrets.toml.gitignore