turbo-transforms
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseTurbo Transforms
Turbo 转换操作
Write, understand, and debug SQL, TypeScript, and dynamic table transforms for Turbo pipeline configurations.
Identify what the user needs (decode, filter, reshape, combine, custom logic, or lookup joins), then use the relevant section below. After writing transforms, always validate the full pipeline YAML before presenting it to the user:
bash
goldsky turbo validate <pipeline.yaml>If generating a complete pipeline YAML (not just a transform snippet), always validate with before presenting it to the user.
goldsky turbo validateReference files for specialized topics:
- — TypeScript/WASM script transforms and handler transforms
references/typescript-transforms.md - — Dynamic table transforms (allowlists, lookup joins)
references/dynamic-tables.md - — Solana instruction/log decoding and function examples (array, JSON, hex)
references/solana-patterns.md
编写、理解并调试Turbo管道配置中的SQL、TypeScript和动态表转换逻辑。
先明确用户需求(解码、过滤、重塑、合并、自定义逻辑或查找连接),再使用下方对应章节。编写完转换逻辑后,务必先验证完整管道YAML,再呈现给用户:
bash
goldsky turbo validate <pipeline.yaml>如果生成的是完整管道YAML(而非仅转换代码片段),在呈现给用户前务必先用验证。
goldsky turbo validate专业主题参考文件:
- — TypeScript/WASM脚本转换和处理器转换
references/typescript-transforms.md - — 动态表转换(白名单、查找连接)
references/dynamic-tables.md - — Solana指令/日志解码及函数示例(数组、JSON、十六进制)
references/solana-patterns.md
Transform Basics
转换基础
YAML Structure
YAML结构
yaml
transforms:
my_transform:
type: sql
primary_key: id
sql: |
SELECT id, block_number, address
FROM my_source
WHERE address = '0xabc...'yaml
transforms:
my_transform:
type: sql
primary_key: id
sql: |
SELECT id, block_number, address
FROM my_source
WHERE address = '0xabc...'Required Fields
必填字段
| Field | Required | Description |
|---|---|---|
| Yes | |
| Yes | Column used for uniqueness and ordering |
| Yes | SQL query (for |
| 字段 | 是否必填 | 描述 |
|---|---|---|
| 是 | |
| 是 | 用于保证唯一性和排序的列 |
| 是 | SQL查询语句(仅 |
Referencing Data
数据引用
- Reference sources by their YAML key name:
FROM my_source - Reference other transforms by their YAML key name:
FROM my_transform - No need for a field in SQL transforms — the
fromclause in SQL handles itFROM
- 通过YAML键名引用数据源:
FROM my_source - 通过YAML键名引用其他转换逻辑:
FROM my_transform - SQL转换无需字段 — SQL中的
from子句会处理数据来源FROM
SQL Streaming Limitations
SQL流式处理限制
Turbo SQL is powered by Apache DataFusion in streaming mode. The following are NOT supported:
- Joins — use transforms for lookup-style joins instead
dynamic_table - Aggregations (GROUP BY, COUNT, SUM, AVG) — use sink instead
postgres_aggregate - Window functions (OVER, PARTITION BY, ROW_NUMBER)
- Subqueries — use transform chaining instead
- CTEs (WITH...AS) are supported
Turbo SQL由Apache DataFusion以流式模式驱动,以下功能不支持:
- 连接操作 — 如需查找类连接,请使用转换
dynamic_table - 聚合操作(GROUP BY、COUNT、SUM、AVG) — 请使用输出端
postgres_aggregate - 窗口函数(OVER、PARTITION BY、ROW_NUMBER)
- 子查询 — 请使用转换链式调用替代
- CTE(WITH...AS)支持使用
The _gs_op
Column
_gs_op_gs_op
列
_gs_opEvery record includes a column that tracks the operation type: (insert), (update), (delete). Preserve this column through transforms for correct upsert semantics in database sinks.
_gs_op'i''u''d'每条记录都包含一个列,用于跟踪操作类型:(插入)、(更新)、(删除)。在转换过程中保留该列,以确保数据库输出端的更新插入语义正确。
_gs_op'i''u''d'Goldsky SQL Functions
Goldsky SQL函数
evm_log_decode()
— Decode Raw EVM Logs
evm_log_decode()evm_log_decode()
— 解码原始EVM日志
evm_log_decode()Decodes raw Ethereum log data into structured event fields using an ABI specification.
Aliases:and_gs_log_decodealso work. Existing pipelines usingevm_decode_logare valid._gs_log_decode
Syntax:
sql
evm_log_decode(abi_json, topics, data) -> STRUCT<name: VARCHAR, event_params: LIST<VARCHAR>>Parameters:
- — JSON string containing the ABI event definitions (or full contract ABI)
abi_json - — The
topicscolumn fromtopicsraw_logs - — The
datacolumn fromdataraw_logs
Returns a struct with:
- (or
decoded.namevia alias) — Event name (e.g.,decoded.event_signature,'OrderFilled')'Transfer' - — Positional event parameters (1-indexed, returned as strings)
decoded.event_params[N]
Example — Decode ERC-20 Transfer events:
yaml
transforms:
decoded_events:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode(
'[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
topics,
data
) AS decoded,
id,
block_number,
transaction_hash,
address,
block_timestamp
FROM my_raw_logsThen filter by event in a downstream transform:
yaml
transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
block_timestamp,
transaction_hash,
address AS token_address,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'Tips for :
_gs_log_decode()- The ABI JSON must be a single-line string or use YAML /
>-block syntax| - Include all event ABIs you want to decode in a single array — events not matching are ignored
- Multiple events with the same name but different signatures (e.g., from different contracts) can coexist
- Use the source's field to pre-filter by contract address before decoding (more efficient)
filter: - Backtick-escape reserved words: Several column names conflict with SQL reserved words and must be escaped with backticks: ,
`data`,`decoded`,`balance`,`owner_address`(in some contexts). When in doubt, backtick-escape column names that could be keywords.`id`
Correct escaping example:
sql
SELECT
_gs_log_decode('[...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM my_raw_logs使用ABI规范将原始以太坊日志数据解码为结构化事件字段。
别名:和_gs_log_decode同样有效。使用evm_decode_log的现有管道仍可正常运行。_gs_log_decode
语法:
sql
evm_log_decode(abi_json, topics, data) -> STRUCT<name: VARCHAR, event_params: LIST<VARCHAR>>参数:
- — 包含ABI事件定义(或完整合约ABI)的JSON字符串
abi_json - —
topics中的raw_logs列topics - —
data中的raw_logs列data
返回结构体包含:
- (或通过别名
decoded.name) — 事件名称(如decoded.event_signature、'OrderFilled')'Transfer' - — 位置化事件参数(从1开始索引,以字符串形式返回)
decoded.event_params[N]
示例 — 解码ERC-20 Transfer事件:
yaml
transforms:
decoded_events:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode(
'[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
topics,
data
) AS decoded,
id,
block_number,
transaction_hash,
address,
block_timestamp
FROM my_raw_logs后续在下游转换中按事件过滤:
yaml
transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
block_timestamp,
transaction_hash,
address AS token_address,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'_gs_log_decode()- ABI JSON必须是单行字符串,或使用YAML的/
>-块语法| - 在单个数组中包含所有你想解码的事件ABI — 不匹配的事件会被忽略
- 同名但签名不同的多个事件(如来自不同合约)可以共存
- 在解码前,使用数据源的字段按合约地址预过滤(效率更高)
filter: - 反引号转义保留字: 部分列名与SQL保留字冲突,必须用反引号转义:、
`data`、`decoded`、`balance`、`owner_address`(某些场景下)。不确定时,对可能是关键字的列名使用反引号转义。`id`
正确转义示例:
sql
SELECT
_gs_log_decode('[...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM my_raw_logsfetch_abi()
— Fetch ABI/IDL from URL
fetch_abi()fetch_abi()
— 从URL获取ABI/IDL
fetch_abi()Fetch an ABI or IDL from a remote URL. Cached internally for performance.
sql
fetch_abi(url, format) -> VARCHAR
-- format: 'raw' for plain JSON, 'etherscan' for Etherscan API responsesAliases:
_gs_fetch_abiExample:
sql
evm_log_decode(
fetch_abi('https://example.com/erc20.json', 'raw'),
topics, data
) AS decoded从远程URL获取ABI或IDL,内部会缓存以提升性能。
sql
fetch_abi(url, format) -> VARCHAR
-- format: 'raw'表示纯JSON,'etherscan'表示Etherscan API响应别名:
_gs_fetch_abi示例:
sql
evm_log_decode(
fetch_abi('https://example.com/erc20.json', 'raw'),
topics, data
) AS decoded_gs_keccak256()
— Keccak256 Hash
_gs_keccak256()_gs_keccak256()
— Keccak256哈希
_gs_keccak256()Compute Keccak256 hash (same as Solidity's ). Returns hex with prefix.
keccak2560xsql
_gs_keccak256('Transfer(address,address,uint256)')
-- 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef计算Keccak256哈希(与Solidity的相同),返回带前缀的十六进制字符串。
keccak2560xsql
_gs_keccak256('Transfer(address,address,uint256)')
-- 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3efxxhash()
— Fast Non-Cryptographic Hash
xxhash()xxhash()
— 快速非加密哈希
xxhash()sql
xxhash(concat(transaction_hash, '_', log_index::VARCHAR)) AS unique_idsql
xxhash(concat(transaction_hash, '_', log_index::VARCHAR)) AS unique_idU256/I256 — 256-bit Integer Math
U256/I256 — 256位整数运算
For precise token amount arithmetic without JavaScript precision loss.
sql
-- Convert to/from U256
to_u256(value) -> U256
u256_to_string(u256_value) -> VARCHAR
-- Arithmetic (also available: u256_sub, u256_mul, u256_div, u256_mod)
u256_add(a, b) -> U256
-- Automatic operator rewriting: once values are cast to U256/I256,
-- standard operators (+, -, *, /, %) are auto-rewritten to function calls
to_u256(value) / to_u256('1000000000000000000') -- same as u256_div(...)I256 (signed): , , , , , , , ,
to_i256i256_to_stringi256_addi256_subi256_muli256_divi256_modi256_negi256_absExample — Convert wei to ETH:
sql
u256_to_string(
to_u256(evt.event_params[3]) / to_u256('1000000000000000000')
) AS amount_eth用于精确的代币金额计算,避免JavaScript精度丢失。
sql
-- 转换为/从U256
to_u256(value) -> U256
u256_to_string(u256_value) -> VARCHAR
-- 算术运算(还支持:u256_sub、u256_mul、u256_div、u256_mod)
u256_add(a, b) -> U256
-- 自动运算符重写:一旦值被转换为U256/I256,标准运算符(+、-、*、/、%)会自动重写为函数调用
to_u256(value) / to_u256('1000000000000000000') -- 等同于u256_div(...)I256(有符号): 、、、、、、、、
to_i256i256_to_stringi256_addi256_subi256_muli256_divi256_modi256_negi256_abs示例 — 将wei转换为ETH:
sql
u256_to_string(
to_u256(evt.event_params[3]) / to_u256('1000000000000000000')
) AS amount_ethSolana Decode Functions
Solana解码函数
sql
-- Decode instruction data using IDL (returns STRUCT<name, value>)
_gs_decode_instruction_data(idl_json, data)
-- Decode program log messages using IDL
_gs_decode_log_message(idl_json, log_messages)
-- Program-specific decoders:
gs_solana_decode_token_program_instruction(data)
gs_solana_decode_system_program_instruction(data)
gs_solana_get_accounts(transaction_data)
gs_solana_get_balance_changes(transaction_data)
gs_solana_decode_associated_token_program_instruction(data)
gs_solana_decode_stake_program_instruction(data)
gs_solana_decode_vote_program_instruction(data)
-- Base58 decoding
_gs_from_base58(base58_string) -> BINARYsql
-- 使用IDL解码指令数据(返回STRUCT<name, value>)
_gs_decode_instruction_data(idl_json, data)
-- 使用IDL解码程序日志消息
_gs_decode_log_message(idl_json, log_messages)
-- 特定程序解码器:
gs_solana_decode_token_program_instruction(data)
gs_solana_decode_system_program_instruction(data)
gs_solana_get_accounts(transaction_data)
gs_solana_get_balance_changes(transaction_data)
gs_solana_decode_associated_token_program_instruction(data)
gs_solana_decode_stake_program_instruction(data)
gs_solana_decode_vote_program_instruction(data)
-- Base58解码
_gs_from_base58(base58_string) -> BINARYArray Functions
数组函数
sql
-- Filter array elements by struct field matching a list of values
-- (prevents overflow panics by filtering BEFORE unnest)
array_filter_in(array, field_name, values_list) -> LIST
-- Convert list to large-list (i64 offsets) for very large arrays
to_large_list(array) -> LARGE_LIST
-- Filter array by field value
array_filter(array, field_name, value) -> LIST
-- Get first matching element
array_filter_first(array, field_name, value) -> STRUCT
-- Add index to each element
array_enumerate(array) -> LIST<STRUCT<index, value>>
-- Combine multiple arrays element-wise
zip_arrays(array1, array2, ...) -> LIST<STRUCT>sql
-- 按结构体字段匹配值列表过滤数组元素(在展开前过滤,防止溢出崩溃)
array_filter_in(array, field_name, values_list) -> LIST
-- 将列表转换为大列表(i64偏移量)以支持超大数组
to_large_list(array) -> LARGE_LIST
-- 按字段值过滤数组
array_filter(array, field_name, value) -> LIST
-- 获取第一个匹配元素
array_filter_first(array, field_name, value) -> STRUCT
-- 为每个元素添加索引
array_enumerate(array) -> LIST<STRUCT<index, value>>
-- 按元素位置合并多个数组
zip_arrays(array1, array2, ...) -> LIST<STRUCT>JSON Functions
JSON函数
sql
json_query(json_string, path) -> VARCHAR -- Query JSON by path
json_value(json_string, path) -> VARCHAR -- Extract scalar value
json_exists(json_string, path) -> BOOLEAN -- Check path exists
is_json(string) -> BOOLEAN -- Validate JSON
parse_json(json_string) -> JSON -- Parse (errors on invalid)
try_parse_json(json_string) -> JSON -- Parse (NULL on invalid)
json_object(key1, val1, ...) -> JSON -- Construct JSON object
json_array(val1, val2, ...) -> JSON -- Construct JSON arraysql
json_query(json_string, path) -> VARCHAR -- 按路径查询JSON
json_value(json_string, path) -> VARCHAR -- 提取标量值
json_exists(json_string, path) -> BOOLEAN -- 检查路径是否存在
is_json(string) -> BOOLEAN -- 验证JSON格式
parse_json(json_string) -> JSON -- 解析JSON(格式错误时抛出异常)
try_parse_json(json_string) -> JSON -- 解析JSON(格式错误时返回NULL)
json_object(key1, val1, ...) -> JSON -- 构造JSON对象
json_array(val1, val2, ...) -> JSON -- 构造JSON数组Time Functions
时间函数
sql
to_timestamp(seconds) -> TIMESTAMP
to_timestamp_micros(microseconds) -> TIMESTAMP
now() -> TIMESTAMP -- volatile, current time
date_part('hour', timestamp) -> INT -- extract timestamp partssql
to_timestamp(seconds) -> TIMESTAMP
to_timestamp_micros(microseconds) -> TIMESTAMP
now() -> TIMESTAMP -- 易变函数,返回当前时间
date_part('hour', timestamp) -> INT -- 提取时间戳部分dynamic_table_check()
— Lookup Table Check
dynamic_table_check()dynamic_table_check()
— 查找表检查
dynamic_table_check()Check if a value exists in a dynamic table (async). Used with transforms.
dynamic_tablesql
WHERE dynamic_table_check('tracked_wallets', from_address)检查某个值是否存在于动态表中(异步操作),需与转换配合使用。
dynamic_tablesql
WHERE dynamic_table_check('tracked_wallets', from_address)String & Encoding Functions
字符串与编码函数
sql
_gs_hex_to_byte(hex_string) -> BINARY
_gs_byte_to_hex(bytes) -> VARCHAR
string_to_array(string, delimiter) -> LIST<VARCHAR>
regexp_extract(string, pattern, group_index) -> VARCHAR
regexp_replace(string, pattern, replacement) -> VARCHARsql
_gs_hex_to_byte(hex_string) -> BINARY
_gs_byte_to_hex(bytes) -> VARCHAR
string_to_array(string, delimiter) -> LIST<VARCHAR>
regexp_extract(string, pattern, group_index) -> VARCHAR
regexp_replace(string, pattern, replacement) -> VARCHARStandard SQL Functions
标准SQL函数
All Apache DataFusion SQL functions are also available: , , , , , , , , , , etc.
loweruppertrimsubstringconcatreplacereverseCOALESCECASE WHENCASTsql
-- Common patterns
lower('0xABC...') -- case-insensitive address
CONCAT('base', '-', COALESCE(addr, '')) -- composite keys
COALESCE(balance.contract_address, '') -- null-safe values
to_timestamp(block_timestamp) AS block_time -- timestamp conversion所有Apache DataFusion SQL函数均支持:、、、、、、、、、等。
loweruppertrimsubstringconcatreplacereverseCOALESCECASE WHENCASTsql
-- 常见用法
lower('0xABC...') -- 不区分大小写的地址
CONCAT('base', '-', COALESCE(addr, '')) -- 复合键
COALESCE(balance.contract_address, '') -- 空值安全处理
to_timestamp(block_timestamp) AS block_time -- 时间戳转换Common Transform Patterns
常见转换模式
1. Table Aliasing
1. 表别名
Use table aliases to reference columns clearly, especially when column names are ambiguous:
sql
SELECT
balance.token_type,
balance.owner_address,
contract_address AS token_address,
CAST(`balance` AS STRING) AS balance_amount
FROM my_balances_source balance
WHERE balance.token_type = 'ERC_20'
AND balance.balance IS NOT NULLNull checks: Use and to filter on nullable columns:
IS NULLIS NOT NULLsql
WHERE balance.token_type IS NULL -- native token (no token type)
WHERE balance.balance IS NOT NULL -- only rows with a balance value使用表别名清晰引用列名,尤其是列名存在歧义时:
sql
SELECT
balance.token_type,
balance.owner_address,
contract_address AS token_address,
CAST(`balance` AS STRING) AS balance_amount
FROM my_balances_source balance
WHERE balance.token_type = 'ERC_20'
AND balance.balance IS NOT NULL空值检查: 使用和过滤可空列:
IS NULLIS NOT NULLsql
WHERE balance.token_type IS NULL -- 原生代币(无代币类型)
WHERE balance.balance IS NOT NULL -- 仅保留有余额值的行2. Simple Filtering (WHERE)
2. 简单过滤(WHERE)
Filter rows from a source by column values:
yaml
transforms:
usdc_transfers:
type: sql
primary_key: id
sql: |
SELECT *
FROM base_transfers
WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')按列值过滤数据源中的行:
yaml
transforms:
usdc_transfers:
type: sql
primary_key: id
sql: |
SELECT *
FROM base_transfers
WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')3. Column Projection and Aliasing
3. 列投影与别名
Select and rename specific columns:
yaml
transforms:
clean_transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
to_timestamp(block_timestamp) AS block_time,
address AS token_address,
sender,
recipient,
amount
FROM my_source选择并重命名特定列:
yaml
transforms:
clean_transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
to_timestamp(block_timestamp) AS block_time,
address AS token_address,
sender,
recipient,
amount
FROM my_source4. Type Casting and Numeric Scaling
4. 类型转换与数值缩放
Cast values between types. Supported CAST targets include , , , , , .
DOUBLESTRINGVARCHARDECIMAL(p,s)INTBIGINTsql
-- Convert from raw uint256 to human-readable amounts
(CAST(decoded.event_params[3] AS DOUBLE) / 1e6) AS amount_usdc -- 6 decimals (USDC)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount_eth -- 18 decimals (ETH, most ERC-20s)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e8) AS amount_btc -- 8 decimals (WBTC)
-- Cast to STRING to preserve precision for very large numbers (e.g., raw balances)
CAST(`balance` AS STRING) AS balance_amountCommon decimal places by token:
| Token | Decimals | Divisor |
|---|---|---|
| USDC, USDT | 6 | |
| WBTC | 8 | |
| ETH, most ERC-20 | 18 | |
在不同类型间转换值。支持的CAST目标包括、、、、、。
DOUBLESTRINGVARCHARDECIMAL(p,s)INTBIGINTsql
-- 将原始uint256转换为人类可读的金额
(CAST(decoded.event_params[3] AS DOUBLE) / 1e6) AS amount_usdc -- 6位小数(USDC)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount_eth -- 18位小数(ETH、多数ERC-20代币)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e8) AS amount_btc -- 8位小数(WBTC)
-- 转换为STRING以保留超大数字的精度(如原始余额)
CAST(`balance` AS STRING) AS balance_amount常见代币小数位数:
| 代币 | 小数位数 | 除数 |
|---|---|---|
| USDC、USDT | 6 | |
| WBTC | 8 | |
| ETH、多数ERC-20 | 18 | |
5. Conditional Logic with CASE WHEN
5. CASE WHEN条件逻辑
Derive new columns based on conditions:
sql
-- Determine trade side based on asset type
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side
-- Conditional amount extraction
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / 1e6) AS amount_usdc
-- Classify transaction types
CASE
WHEN decoded.event_params[3] = '0x4bfb...' THEN 'taker'
WHEN decoded.event_params[3] = '0xc5d5...' THEN 'taker'
ELSE 'maker'
END AS order_type根据条件派生新列:
sql
-- 根据资产类型判断交易方向
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side
-- 条件式金额提取
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / 1e6) AS amount_usdc
-- 分类交易类型
CASE
WHEN decoded.event_params[3] = '0x4bfb...' THEN 'taker'
WHEN decoded.event_params[3] = '0xc5d5...' THEN 'taker'
ELSE 'maker'
END AS order_type6. String Manipulation
6. 字符串操作
sql
-- Prepend '0x' to hex values from decoded params
'0x' || decoded.event_params[4] AS condition_id
-- Static string values
'TRADE' AS tx_type
'' AS placeholder_fieldsql
-- 为解码参数中的十六进制值添加'0x'前缀
'0x' || decoded.event_params[4] AS condition_id
-- 静态字符串值
'TRADE' AS tx_type
'' AS placeholder_field7. Exclusion Filters
7. 排除过滤
Exclude known contract/system addresses from user-facing data:
sql
WHERE decoded.event_params[1] NOT IN (
'0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e',
'0xc5d563a36ae78145c45a50134d48a1215220f80a',
'0x4d97dcd97ec945f40cf65f87097ace5ea0476045'
)从面向用户的数据中排除已知的合约/系统地址:
sql
WHERE decoded.event_params[1] NOT IN (
'0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e',
'0xc5d563a36ae78145c45a50134d48a1215220f80a',
'0x4d97dcd97ec945f40cf65f87097ace5ea0476045'
)8. Price Calculations
8. 价格计算
Derive price from filled amounts:
sql
-- price = cost / quantity
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[7] AS DOUBLE)
ELSE CAST(decoded.event_params[6] AS DOUBLE)
END) AS price从成交金额派生价格:
sql
-- 价格 = 成本 / 数量
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[7] AS DOUBLE)
ELSE CAST(decoded.event_params[6] AS DOUBLE)
END) AS priceAdvanced Patterns
高级模式
Decode Once, Filter Many
一次解码,多次过滤
When working with raw logs from multiple contract events, decode all events in a single transform and then create separate downstream transforms that filter by . This is more efficient than creating multiple decode transforms.
decoded.event_signatureyaml
transforms:
# Single decode step — include ALL event ABIs
all_decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...full ABI with all events...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# Downstream: each transform filters for one event type
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS from_addr, ...
FROM all_decoded
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS owner, ...
FROM all_decoded
WHERE decoded.event_signature = 'Approval'Even if you only need one event type downstream, it's fine to include the full ABI — unmatched events are simply ignored by the downstream WHERE filter.
处理来自多个合约事件的原始日志时,在单个转换步骤中解码所有事件,然后创建多个下游转换,通过过滤。这比创建多个解码转换更高效。
decoded.event_signatureyaml
transforms:
# 单个解码步骤 — 包含所有事件ABI
all_decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...包含所有事件的完整ABI...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# 下游:每个转换过滤一种事件类型
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS from_addr, ...
FROM all_decoded
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS owner, ...
FROM all_decoded
WHERE decoded.event_signature = 'Approval'即使你只需要下游的一种事件类型,也可以包含完整ABI — 不匹配的事件会被下游WHERE过滤器忽略。
Transform Chaining
转换链式调用
Build multi-step pipelines where each transform reads from the previous one:
yaml
transforms:
# Step 1: Decode raw logs
decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...]', topics, data) AS decoded,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# Step 2: Extract specific event fields
transfers:
type: sql
primary_key: id
sql: |
SELECT
id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded
WHERE decoded.event_signature = 'Transfer'
# Step 3: Add computed columns
enriched_transfers:
type: sql
primary_key: id
sql: SELECT *, 'ethereum' AS chain FROM transfers构建多步骤管道,每个转换读取前一个转换的输出:
yaml
transforms:
# 步骤1:解码原始日志
decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...]', topics, data) AS decoded,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# 步骤2:提取特定事件字段
transfers:
type: sql
primary_key: id
sql: |
SELECT
id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded
WHERE decoded.event_signature = 'Transfer'
# 步骤3:添加计算列
enriched_transfers:
type: sql
primary_key: id
sql: SELECT *, 'ethereum' AS chain FROM transfersUNION ALL — Combining Multiple Event Types
UNION ALL — 合并多种事件类型
Combine multiple transforms with identical schemas into a single output. Every SELECT in the UNION must have the exact same columns in the same order.
yaml
transforms:
# Individual event transforms (each with identical output columns)
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'TRANSFER' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'APPROVAL' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Approval'
# Combined output
all_events:
type: sql
primary_key: id
sql: |
SELECT * FROM transfers
UNION ALL
SELECT * FROM approvalsUNION ALL rules:
- All SELECTs must produce the same number of columns with compatible types
- Use empty strings () or zero (
'') as placeholders for columns that don't apply to a particular event type0 - keeps duplicates (use
UNION ALLto deduplicate, butUNIONis preferred for performance)UNION ALL - You can UNION as many transforms as needed
将多个具有相同 schema 的转换输出合并为单个结果。UNION中的每个SELECT必须具有完全相同的列和顺序。
yaml
transforms:
# 单个事件转换(每个都有相同的输出列)
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'TRANSFER' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'APPROVAL' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Approval'
# 合并后的输出
all_events:
type: sql
primary_key: id
sql: |
SELECT * FROM transfers
UNION ALL
SELECT * FROM approvalsUNION ALL规则:
- 所有SELECT必须生成相同数量的列,且类型兼容
- 对特定事件类型不适用的列,使用空字符串()或0作为占位符
'' - 会保留重复项(如需去重使用
UNION ALL,但UNION性能更优)UNION ALL - 可以合并任意数量的转换
Normalizing Disparate Events to a Common Schema
将不同事件标准化为通用Schema
When different events have different fields, map them all to a unified schema using placeholder values:
yaml
transforms:
trades:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[2] AS user_id,
decoded.event_params[4] AS asset,
'' AS condition_id, -- not applicable for trades
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'TRADE' AS tx_type,
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side,
(CAST(decoded.event_params[8] AS DOUBLE) / 1e6) AS fee
FROM decoded_events
WHERE decoded.event_signature = 'OrderFilled'
redemptions:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[1] AS user_id,
'' AS asset, -- not applicable for redemptions
'0x' || decoded.event_params[4] AS condition_id,
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'REDEEM' AS tx_type,
'' AS side, -- not applicable for redemptions
0 AS fee -- no fee for redemptions
FROM decoded_events
WHERE decoded.event_signature = 'PayoutRedemption'
all_activities:
type: sql
primary_key: id
sql: |
SELECT * FROM trades
UNION ALL
SELECT * FROM redemptions当不同事件具有不同字段时,使用占位符将它们映射到统一Schema:
yaml
transforms:
trades:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[2] AS user_id,
decoded.event_params[4] AS asset,
'' AS condition_id, -- 交易不适用该字段
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'TRADE' AS tx_type,
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side,
(CAST(decoded.event_params[8] AS DOUBLE) / 1e6) AS fee
FROM decoded_events
WHERE decoded.event_signature = 'OrderFilled'
redemptions:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[1] AS user_id,
'' AS asset, -- 赎回不适用该字段
'0x' || decoded.event_params[4] AS condition_id,
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'REDEEM' AS tx_type,
'' AS side, -- 赎回不适用该字段
0 AS fee -- 赎回无手续费
FROM decoded_events
WHERE decoded.event_signature = 'PayoutRedemption'
all_activities:
type: sql
primary_key: id
sql: |
SELECT * FROM trades
UNION ALL
SELECT * FROM redemptionsAdding Columns to an Existing Transform
为现有转换添加列
Extend a transform's output without rewriting it:
yaml
activities_v2:
type: sql
primary_key: id
sql: SELECT *, '' AS builder FROM all_activities无需重写转换即可扩展其输出:
yaml
activities_v2:
type: sql
primary_key: id
sql: SELECT *, '' AS builder FROM all_activitiesSource-Level Filtering
数据源级过滤
For efficiency, filter data at the source before it reaches transforms. Use the field on dataset sources to reduce the volume of data processed:
filter:yaml
sources:
my_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number > 50000000This is significantly more efficient than filtering in a transform because it reduces data at the ingestion layer.
为提升效率,在数据到达转换步骤前在数据源层过滤数据。在数据集数据源上使用字段,减少需要处理的数据量:
filter:yaml
sources:
my_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number > 50000000这比在转换中过滤高效得多,因为它在数据摄入层就减少了数据量。
Bounded Historical Backfill
有限历史回填
To process historical data starting from a specific block (not genesis), combine with a floor in the filter:
start_at: earliestblock_numberyaml
sources:
poly_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number >= 82422949This processes all data from block 82422949 onward, rather than from genesis or only latest. Useful when:
- A contract was deployed at a known block
- You only need data from a certain date forward
- You want to avoid processing irrelevant ancient blocks
Combine source filtering with transform filtering:
- Source → coarse pre-filtering (contract addresses, block ranges)
filter: - Transform → fine-grained filtering (event types, parameter values, exclusions)
WHERE
如需从特定区块(而非创世区块)开始处理历史数据,将与过滤器中的下限结合使用:
start_at: earliestblock_numberyaml
sources:
poly_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number >= 82422949这会处理从区块82422949开始的所有数据,而非创世区块或仅最新数据。适用于以下场景:
- 合约在已知区块部署
- 你只需要特定日期之后的数据
- 你想避免处理无关的古老区块
结合数据源过滤与转换过滤:
- 数据源→ 粗粒度预过滤(合约地址、区块范围)
filter: - 转换→ 细粒度过滤(事件类型、参数值、排除项)
WHERE
Debugging Transforms
调试转换逻辑
Common Errors
常见错误
"Unknown source reference"
The clause references a name that doesn't match any source or transform key in the YAML. Check for typos.
FROM"Missing primary_key"
Every transform needs . Almost always use (carried from the source data).
primary_keyid"Column not found"
Column name doesn't exist on the source. Use to see actual column names.
goldsky turbo inspect <pipeline> -n <source_node>Empty results from decoded events
- Verify the ABI JSON matches the actual contract events
- Check matches the event name exactly (case-sensitive)
decoded.event_signature - Ensure filter matches the correct contract
address - Check indexing — parameters are 1-indexed
decoded.event_params[N]
Type mismatch in UNION ALL
All branches of a UNION ALL must have identical column counts and compatible types. Add placeholder columns (, ) where needed.
''0"Unknown source reference"
子句引用的名称与YAML中的任何数据源或转换键不匹配。检查拼写错误。
FROM"Missing primary_key"
每个转换都需要。几乎总是使用(从数据源继承)。
primary_keyid"Column not found"
列名在数据源中不存在。使用查看实际列名。
goldsky turbo inspect <pipeline> -n <source_node>解码事件无结果
- 验证ABI JSON与实际合约事件匹配
- 检查与事件名称完全匹配(区分大小写)
decoded.event_signature - 确保过滤器匹配正确的合约
address - 检查索引 — 参数是从1开始索引的
decoded.event_params[N]
UNION ALL类型不匹配
UNION ALL的所有分支必须具有相同的列数和兼容的类型。必要时添加占位符列(、)。
''0Inspecting Transform Output
检查转换输出
Use the TUI inspector to see data at any node in the pipeline:
bash
undefined使用TUI检查器查看管道中任意节点的数据:
bash
undefinedInspect a specific transform's output
检查特定转换的输出
goldsky turbo inspect <pipeline-name> -n <transform_name>
This helps verify that decoded fields, casts, and filters are producing expected results.
---goldsky turbo inspect <pipeline-name> -n <transform_name>
这有助于验证解码字段、类型转换和过滤器是否生成了预期结果。
---Complete Example: Multi-Event Activity Feed
完整示例:多事件活动流
This end-to-end example shows how to build a unified activity feed from raw logs. See the template file for the full working YAML.
templates/multi-event-activity-feed.yamlPattern:
- Source raw logs filtered by contract addresses
- Decode all events with
_gs_log_decode() - Create individual transforms per event type, each mapping to a common schema
- Combine with
UNION ALL - Sink to a database
这个端到端示例展示了如何从原始日志构建统一的活动流。完整可用YAML请查看模板文件。
templates/multi-event-activity-feed.yaml模式:
- 按合约地址过滤的原始日志数据源
- 使用解码所有事件
_gs_log_decode() - 为每种事件类型创建单独的转换,映射到通用Schema
- 通过合并
UNION ALL - 输出到数据库
Sink Batching Configuration
输出端批处理配置
Database and streaming sinks support batching to tune latency vs throughput:
yaml
sinks:
my_sink:
type: clickhouse
from: my_transform
table: my_table
secret_name: MY_SECRET
primary_key: id
batch_size: 1000 # rows per batch
batch_flush_interval: 300ms # max time before flushing| Setting | Description | Trade-off |
|---|---|---|
| Max rows accumulated before flushing | Higher = more throughput |
| Max wait time before flushing a batch | Lower = lower latency |
Guidelines:
- Latency-sensitive (real-time dashboards): ,
batch_flush_interval: 300msbatch_size: 1000 - Moderate throughput (trade data, events): ,
batch_flush_interval: 1000msbatch_size: 1000 - High-volume streaming (balance snapshots): ,
batch_flush_interval: 10sbatch_size: 100000
数据库和流式输出端支持批处理配置,以调整延迟与吞吐量的平衡:
yaml
sinks:
my_sink:
type: clickhouse
from: my_transform
table: my_table
secret_name: MY_SECRET
primary_key: id
batch_size: 1000 # 每批行数
batch_flush_interval: 300ms # 批处理刷新的最长等待时间| 设置 | 描述 | 权衡 |
|---|---|---|
| 刷新前累积的最大行数 | 数值越高,吞吐量越大 |
| 批处理刷新的最长等待时间 | 数值越低,延迟越低 |
配置指南:
- 低延迟敏感(实时仪表盘):,
batch_flush_interval: 300msbatch_size: 1000 - 中等吞吐量(交易数据、事件):,
batch_flush_interval: 1000msbatch_size: 1000 - 高容量流式处理(余额快照):,
batch_flush_interval: 10sbatch_size: 100000
TypeScript / WASM Script Transforms
TypeScript / WASM脚本转换
For logic SQL can't express: custom parsing, BigInt arithmetic, stateful processing, or complex conditionals. Schema types: , , , , , .
stringuint64int64float64booleanbytesKey rules: define , return to filter a record, return an object matching the schema, no async/await or external imports.
function invoke(data)nullSee for full docs, schema field reference, examples, and the when-to-use-script-vs-SQL table. Also includes Handler transforms for HTTP enrichment via external APIs.
references/typescript-transforms.md用于SQL无法表达的逻辑:自定义解析、BigInt运算、有状态处理或复杂条件判断。Schema类型包括:、、、、、。
stringuint64int64float64booleanbytes核心规则:定义,返回以过滤记录,返回与Schema匹配的对象,不支持async/await或外部导入。
function invoke(data)null完整文档、Schema字段参考、示例以及何时使用脚本vs SQL的对比表,请查看。还包括通过外部API进行HTTP增强的处理器转换。
references/typescript-transforms.mdDynamic Table Transforms
动态表转换
Updatable lookup tables for allowlists, blocklists, and join-style enrichment — without redeploying. Backed by either PostgreSQL (durable, externally updatable) or in-memory (fast, ephemeral). Use in SQL transforms to filter records.
dynamic_table_check('table_name', column)See for full config, backend options, usage, REST API updates, and a complete wallet-tracking example.
references/dynamic-tables.mddynamic_table_check()可更新的查找表,用于白名单、黑名单和连接式数据增强 — 无需重新部署。支持PostgreSQL(持久化、可外部更新)或内存(快速、临时)作为后端。在SQL转换中使用过滤记录。
dynamic_table_check('table_name', column)完整配置、后端选项、用法、REST API更新以及完整钱包跟踪示例,请查看。
dynamic_table_check()references/dynamic-tables.mdSolana Transform Patterns
Solana转换模式
For decoding Solana instructions with IDL, program-specific decoders (Token, System, Stake, Vote programs), and SPL token tracking.
See for IDL-based decoding, all built-in program decoders, a full example pipeline, and detailed array/JSON/hex function usage examples.
references/solana-patterns.md用于使用IDL解码Solana指令、特定程序解码器(Token、System、Stake、Vote程序)以及SPL代币跟踪。
基于IDL的解码、所有内置程序解码器、完整示例管道以及数组/JSON/十六进制函数的详细用法示例,请查看。
references/solana-patterns.mdRelated
相关资源
- — Build and deploy pipelines interactively using these transforms
/turbo-builder - — Diagnose and fix pipeline issues (including transform errors)
/turbo-doctor - — Pipeline YAML configuration reference
/turbo-pipelines - — Architecture decisions (source types, data flow patterns, resource sizing)
/turbo-architecture - — Blockchain dataset and chain prefix reference
/datasets - — Monitoring and debugging reference
/turbo-monitor-debug
- — 使用这些转换逻辑交互式构建和部署管道
/turbo-builder - — 诊断并修复管道问题(包括转换错误)
/turbo-doctor - — 管道YAML配置参考
/turbo-pipelines - — 架构决策(数据源类型、数据流模式、资源 sizing)
/turbo-architecture - — 区块链数据集和链前缀参考
/datasets - — 监控与调试参考
/turbo-monitor-debug