Loading...
Loading...
Creates and maintains dlt (data load tool) pipelines from APIs, databases, and other sources. Use when the user wants to build or debug pipelines; use verified sources (e.g. Salesforce, GitHub, Stripe) or declarative REST API or custom Python; configure destinations (e.g. DuckDB, BigQuery, Snowflake); implement incremental loading; or edit .dlt config and secrets. Use when the user mentions data ingestion, dlt pipeline, dlt init, rest_api_source, incremental load, or pipeline dashboard.
npx skill4agent add untitled-data-company/dlt-skill dlt-skillSTART: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decoratorsdlt init <source_name> <destination_name>dlt init salesforce bigquerydlt init github duckdbdlt init stripe snowflakedeclarative_rest_pipeline.pycustom_python_pipeline.pypython scripts/install_packages.py --destination <destination_name>pip install "dlt[<destination>,workspace]"bigquerysnowflakedlt[workspace]workspacedlt pipeline <name> show.dlt/secrets.toml[sources.<source_name>]
# Source credentials here
[destination.<destination_name>]
# Destination credentials here.dlt/secrets.toml.gitignore.dlt/config.toml[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US".with_resources().apply_hints()max_table_nestingtable_name@dlt.source@dlt.resourcemergepython <pipeline_file>.pydlt[workspace]install_packages.pydlt pipeline <pipeline_name> showpython scripts/open_dashboard.py <pipeline_name>from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)
# Load only specific Salesforce objects
pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}dlt.attach()import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source
# 1. Pre-fetch data from database (outside dlt context)
def get_locations():
conn = duckdb.connect("locations.duckdb", read_only=True)
result = conn.execute("SELECT id, lat, lng FROM locations").fetchall()
conn.close()
return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]
# 2. Create seed resource
@dlt.resource(selected=False)
def locations():
yield get_locations() # Yield as LIST
# 3. Configure REST API with resolve
config = {
"client": {"base_url": "https://api.weather.com/"},
"resources": [
locations(),
{
"name": "weather",
"endpoint": {
"path": "forecast",
"params": {
"lat": "{resources.locations.lat}",
"lng": "{resources.locations.lng}"
},
"data_selector": "$",
"paginator": "single_page"
},
"include_from_parent": ["id"],
"primary_key": "_locations_id"
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(
pipeline_name="weather",
destination="duckdb",
dataset_name="weather_data"
)
pipeline.run(source).dlt/secrets.toml.gitignoreappendmergereplace"auth": {"type": "oauth2_client_credentials", ...}dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentialspaginate()workspacedlt pipeline <name> show.dlt/secrets.toml.gitignore