turbo-transforms

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Turbo Transforms

Turbo 转换操作

Write, understand, and debug SQL, TypeScript, and dynamic table transforms for Turbo pipeline configurations.
Identify what the user needs (decode, filter, reshape, combine, custom logic, or lookup joins), then use the relevant section below. After writing transforms, always validate the full pipeline YAML before presenting it to the user:
bash
goldsky turbo validate <pipeline.yaml>
If generating a complete pipeline YAML (not just a transform snippet), always validate with
goldsky turbo validate
before presenting it to the user.
Reference files for specialized topics:
  • references/typescript-transforms.md
    — TypeScript/WASM script transforms and handler transforms
  • references/dynamic-tables.md
    — Dynamic table transforms (allowlists, lookup joins)
  • references/solana-patterns.md
    — Solana instruction/log decoding and function examples (array, JSON, hex)

编写、理解并调试Turbo管道配置中的SQL、TypeScript和动态表转换逻辑。
先明确用户需求(解码、过滤、重塑、合并、自定义逻辑或查找连接),再使用下方对应章节。编写完转换逻辑后,务必先验证完整管道YAML,再呈现给用户:
bash
goldsky turbo validate <pipeline.yaml>
如果生成的是完整管道YAML(而非仅转换代码片段),在呈现给用户前务必先用
goldsky turbo validate
验证。
专业主题参考文件:
  • references/typescript-transforms.md
    — TypeScript/WASM脚本转换和处理器转换
  • references/dynamic-tables.md
    — 动态表转换(白名单、查找连接)
  • references/solana-patterns.md
    — Solana指令/日志解码及函数示例(数组、JSON、十六进制)

Transform Basics

转换基础

YAML Structure

YAML结构

yaml
transforms:
  my_transform:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, address
      FROM my_source
      WHERE address = '0xabc...'
yaml
transforms:
  my_transform:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, address
      FROM my_source
      WHERE address = '0xabc...'

Required Fields

必填字段

FieldRequiredDescription
type
Yes
sql
,
script
,
handler
, or
dynamic_table
primary_key
YesColumn used for uniqueness and ordering
sql
YesSQL query (for
sql
type transforms)
字段是否必填描述
type
sql
script
handler
dynamic_table
primary_key
用于保证唯一性和排序的列
sql
SQL查询语句(仅
sql
类型转换适用)

Referencing Data

数据引用

  • Reference sources by their YAML key name:
    FROM my_source
  • Reference other transforms by their YAML key name:
    FROM my_transform
  • No need for a
    from
    field in SQL transforms — the
    FROM
    clause in SQL handles it
  • 通过YAML键名引用数据源
    FROM my_source
  • 通过YAML键名引用其他转换逻辑
    FROM my_transform
  • SQL转换无需
    from
    字段 — SQL中的
    FROM
    子句会处理数据来源

SQL Streaming Limitations

SQL流式处理限制

Turbo SQL is powered by Apache DataFusion in streaming mode. The following are NOT supported:
  • Joins — use
    dynamic_table
    transforms for lookup-style joins instead
  • Aggregations (GROUP BY, COUNT, SUM, AVG) — use
    postgres_aggregate
    sink instead
  • Window functions (OVER, PARTITION BY, ROW_NUMBER)
  • Subqueries — use transform chaining instead
  • CTEs (WITH...AS) are supported
Turbo SQL由Apache DataFusion以流式模式驱动,以下功能不支持
  • 连接操作 — 如需查找类连接,请使用
    dynamic_table
    转换
  • 聚合操作(GROUP BY、COUNT、SUM、AVG) — 请使用
    postgres_aggregate
    输出端
  • 窗口函数(OVER、PARTITION BY、ROW_NUMBER)
  • 子查询 — 请使用转换链式调用替代
  • CTE(WITH...AS)支持使用

The
_gs_op
Column

_gs_op

Every record includes a
_gs_op
column that tracks the operation type:
'i'
(insert),
'u'
(update),
'd'
(delete). Preserve this column through transforms for correct upsert semantics in database sinks.

每条记录都包含一个
_gs_op
列,用于跟踪操作类型:
'i'
(插入)、
'u'
(更新)、
'd'
(删除)。在转换过程中保留该列,以确保数据库输出端的更新插入语义正确。

Goldsky SQL Functions

Goldsky SQL函数

evm_log_decode()
— Decode Raw EVM Logs

evm_log_decode()
— 解码原始EVM日志

Decodes raw Ethereum log data into structured event fields using an ABI specification.
Aliases:
_gs_log_decode
and
evm_decode_log
also work. Existing pipelines using
_gs_log_decode
are valid.
Syntax:
sql
evm_log_decode(abi_json, topics, data) -> STRUCT<name: VARCHAR, event_params: LIST<VARCHAR>>
Parameters:
  • abi_json
    — JSON string containing the ABI event definitions (or full contract ABI)
  • topics
    — The
    topics
    column from
    raw_logs
  • data
    — The
    data
    column from
    raw_logs
Returns a struct with:
  • decoded.name
    (or
    decoded.event_signature
    via alias) — Event name (e.g.,
    'OrderFilled'
    ,
    'Transfer'
    )
  • decoded.event_params[N]
    — Positional event parameters (1-indexed, returned as strings)
Example — Decode ERC-20 Transfer events:
yaml
transforms:
  decoded_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
          topics,
          data
        ) AS decoded,
        id,
        block_number,
        transaction_hash,
        address,
        block_timestamp
      FROM my_raw_logs
Then filter by event in a downstream transform:
yaml
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        block_number,
        block_timestamp,
        transaction_hash,
        address AS token_address,
        decoded.event_params[1] AS from_address,
        decoded.event_params[2] AS to_address,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
      FROM decoded_events
      WHERE decoded.event_signature = 'Transfer'
Tips for
_gs_log_decode()
:
  • The ABI JSON must be a single-line string or use YAML
    >-
    /
    |
    block syntax
  • Include all event ABIs you want to decode in a single array — events not matching are ignored
  • Multiple events with the same name but different signatures (e.g., from different contracts) can coexist
  • Use the source's
    filter:
    field to pre-filter by contract address before decoding (more efficient)
  • Backtick-escape reserved words: Several column names conflict with SQL reserved words and must be escaped with backticks:
    `data`
    ,
    `decoded`
    ,
    `balance`
    ,
    `owner_address`
    ,
    `id`
    (in some contexts). When in doubt, backtick-escape column names that could be keywords.
Correct escaping example:
sql
SELECT
  _gs_log_decode('[...]', topics, `data`) AS `decoded`,
  id, block_number, transaction_hash, address, block_timestamp
FROM my_raw_logs
使用ABI规范将原始以太坊日志数据解码为结构化事件字段。
别名:
_gs_log_decode
evm_decode_log
同样有效。使用
_gs_log_decode
的现有管道仍可正常运行。
语法:
sql
evm_log_decode(abi_json, topics, data) -> STRUCT<name: VARCHAR, event_params: LIST<VARCHAR>>
参数:
  • abi_json
    — 包含ABI事件定义(或完整合约ABI)的JSON字符串
  • topics
    raw_logs
    中的
    topics
  • data
    raw_logs
    中的
    data
返回结构体包含:
  • decoded.name
    (或通过别名
    decoded.event_signature
    ) — 事件名称(如
    'OrderFilled'
    'Transfer'
  • decoded.event_params[N]
    — 位置化事件参数(从1开始索引,以字符串形式返回)
示例 — 解码ERC-20 Transfer事件:
yaml
transforms:
  decoded_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
          topics,
          data
        ) AS decoded,
        id,
        block_number,
        transaction_hash,
        address,
        block_timestamp
      FROM my_raw_logs
后续在下游转换中按事件过滤:
yaml
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        block_number,
        block_timestamp,
        transaction_hash,
        address AS token_address,
        decoded.event_params[1] AS from_address,
        decoded.event_params[2] AS to_address,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
      FROM decoded_events
      WHERE decoded.event_signature = 'Transfer'
_gs_log_decode()
使用技巧:
  • ABI JSON必须是单行字符串,或使用YAML的
    >-
    /
    |
    块语法
  • 在单个数组中包含所有你想解码的事件ABI — 不匹配的事件会被忽略
  • 同名但签名不同的多个事件(如来自不同合约)可以共存
  • 在解码前,使用数据源的
    filter:
    字段按合约地址预过滤(效率更高)
  • 反引号转义保留字: 部分列名与SQL保留字冲突,必须用反引号转义:
    `data`
    `decoded`
    `balance`
    `owner_address`
    `id`
    (某些场景下)。不确定时,对可能是关键字的列名使用反引号转义。
正确转义示例:
sql
SELECT
  _gs_log_decode('[...]', topics, `data`) AS `decoded`,
  id, block_number, transaction_hash, address, block_timestamp
FROM my_raw_logs

fetch_abi()
— Fetch ABI/IDL from URL

fetch_abi()
— 从URL获取ABI/IDL

Fetch an ABI or IDL from a remote URL. Cached internally for performance.
sql
fetch_abi(url, format) -> VARCHAR
-- format: 'raw' for plain JSON, 'etherscan' for Etherscan API responses
Aliases:
_gs_fetch_abi
Example:
sql
evm_log_decode(
  fetch_abi('https://example.com/erc20.json', 'raw'),
  topics, data
) AS decoded
从远程URL获取ABI或IDL,内部会缓存以提升性能。
sql
fetch_abi(url, format) -> VARCHAR
-- format: 'raw'表示纯JSON,'etherscan'表示Etherscan API响应
别名:
_gs_fetch_abi
示例:
sql
evm_log_decode(
  fetch_abi('https://example.com/erc20.json', 'raw'),
  topics, data
) AS decoded

_gs_keccak256()
— Keccak256 Hash

_gs_keccak256()
— Keccak256哈希

Compute Keccak256 hash (same as Solidity's
keccak256
). Returns hex with
0x
prefix.
sql
_gs_keccak256('Transfer(address,address,uint256)')
-- 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef
计算Keccak256哈希(与Solidity的
keccak256
相同),返回带
0x
前缀的十六进制字符串。
sql
_gs_keccak256('Transfer(address,address,uint256)')
-- 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef

xxhash()
— Fast Non-Cryptographic Hash

xxhash()
— 快速非加密哈希

sql
xxhash(concat(transaction_hash, '_', log_index::VARCHAR)) AS unique_id
sql
xxhash(concat(transaction_hash, '_', log_index::VARCHAR)) AS unique_id

U256/I256 — 256-bit Integer Math

U256/I256 — 256位整数运算

For precise token amount arithmetic without JavaScript precision loss.
sql
-- Convert to/from U256
to_u256(value)              -> U256
u256_to_string(u256_value)  -> VARCHAR

-- Arithmetic (also available: u256_sub, u256_mul, u256_div, u256_mod)
u256_add(a, b) -> U256

-- Automatic operator rewriting: once values are cast to U256/I256,
-- standard operators (+, -, *, /, %) are auto-rewritten to function calls
to_u256(value) / to_u256('1000000000000000000')  -- same as u256_div(...)
I256 (signed):
to_i256
,
i256_to_string
,
i256_add
,
i256_sub
,
i256_mul
,
i256_div
,
i256_mod
,
i256_neg
,
i256_abs
Example — Convert wei to ETH:
sql
u256_to_string(
  to_u256(evt.event_params[3]) / to_u256('1000000000000000000')
) AS amount_eth
用于精确的代币金额计算,避免JavaScript精度丢失。
sql
-- 转换为/从U256
to_u256(value)              -> U256
u256_to_string(u256_value)  -> VARCHAR

-- 算术运算(还支持:u256_sub、u256_mul、u256_div、u256_mod)
u256_add(a, b) -> U256

-- 自动运算符重写:一旦值被转换为U256/I256,标准运算符(+、-、*、/、%)会自动重写为函数调用
to_u256(value) / to_u256('1000000000000000000')  -- 等同于u256_div(...)
I256(有符号):
to_i256
i256_to_string
i256_add
i256_sub
i256_mul
i256_div
i256_mod
i256_neg
i256_abs
示例 — 将wei转换为ETH:
sql
u256_to_string(
  to_u256(evt.event_params[3]) / to_u256('1000000000000000000')
) AS amount_eth

Solana Decode Functions

Solana解码函数

sql
-- Decode instruction data using IDL (returns STRUCT<name, value>)
_gs_decode_instruction_data(idl_json, data)

-- Decode program log messages using IDL
_gs_decode_log_message(idl_json, log_messages)

-- Program-specific decoders:
gs_solana_decode_token_program_instruction(data)
gs_solana_decode_system_program_instruction(data)
gs_solana_get_accounts(transaction_data)
gs_solana_get_balance_changes(transaction_data)
gs_solana_decode_associated_token_program_instruction(data)
gs_solana_decode_stake_program_instruction(data)
gs_solana_decode_vote_program_instruction(data)

-- Base58 decoding
_gs_from_base58(base58_string) -> BINARY
sql
-- 使用IDL解码指令数据(返回STRUCT<name, value>)
_gs_decode_instruction_data(idl_json, data)

-- 使用IDL解码程序日志消息
_gs_decode_log_message(idl_json, log_messages)

-- 特定程序解码器:
gs_solana_decode_token_program_instruction(data)
gs_solana_decode_system_program_instruction(data)
gs_solana_get_accounts(transaction_data)
gs_solana_get_balance_changes(transaction_data)
gs_solana_decode_associated_token_program_instruction(data)
gs_solana_decode_stake_program_instruction(data)
gs_solana_decode_vote_program_instruction(data)

-- Base58解码
_gs_from_base58(base58_string) -> BINARY

Array Functions

数组函数

sql
-- Filter array elements by struct field matching a list of values
-- (prevents overflow panics by filtering BEFORE unnest)
array_filter_in(array, field_name, values_list) -> LIST

-- Convert list to large-list (i64 offsets) for very large arrays
to_large_list(array) -> LARGE_LIST

-- Filter array by field value
array_filter(array, field_name, value) -> LIST

-- Get first matching element
array_filter_first(array, field_name, value) -> STRUCT

-- Add index to each element
array_enumerate(array) -> LIST<STRUCT<index, value>>

-- Combine multiple arrays element-wise
zip_arrays(array1, array2, ...) -> LIST<STRUCT>
sql
-- 按结构体字段匹配值列表过滤数组元素(在展开前过滤,防止溢出崩溃)
array_filter_in(array, field_name, values_list) -> LIST

-- 将列表转换为大列表(i64偏移量)以支持超大数组
to_large_list(array) -> LARGE_LIST

-- 按字段值过滤数组
array_filter(array, field_name, value) -> LIST

-- 获取第一个匹配元素
array_filter_first(array, field_name, value) -> STRUCT

-- 为每个元素添加索引
array_enumerate(array) -> LIST<STRUCT<index, value>>

-- 按元素位置合并多个数组
zip_arrays(array1, array2, ...) -> LIST<STRUCT>

JSON Functions

JSON函数

sql
json_query(json_string, path) -> VARCHAR       -- Query JSON by path
json_value(json_string, path) -> VARCHAR       -- Extract scalar value
json_exists(json_string, path) -> BOOLEAN      -- Check path exists
is_json(string) -> BOOLEAN                     -- Validate JSON
parse_json(json_string) -> JSON                -- Parse (errors on invalid)
try_parse_json(json_string) -> JSON            -- Parse (NULL on invalid)
json_object(key1, val1, ...) -> JSON           -- Construct JSON object
json_array(val1, val2, ...) -> JSON            -- Construct JSON array
sql
json_query(json_string, path) -> VARCHAR       -- 按路径查询JSON
json_value(json_string, path) -> VARCHAR       -- 提取标量值
json_exists(json_string, path) -> BOOLEAN      -- 检查路径是否存在
is_json(string) -> BOOLEAN                     -- 验证JSON格式
parse_json(json_string) -> JSON                -- 解析JSON(格式错误时抛出异常)
try_parse_json(json_string) -> JSON            -- 解析JSON(格式错误时返回NULL)
json_object(key1, val1, ...) -> JSON           -- 构造JSON对象
json_array(val1, val2, ...) -> JSON            -- 构造JSON数组

Time Functions

时间函数

sql
to_timestamp(seconds) -> TIMESTAMP
to_timestamp_micros(microseconds) -> TIMESTAMP
now() -> TIMESTAMP                              -- volatile, current time
date_part('hour', timestamp) -> INT             -- extract timestamp parts
sql
to_timestamp(seconds) -> TIMESTAMP
to_timestamp_micros(microseconds) -> TIMESTAMP
now() -> TIMESTAMP                              -- 易变函数,返回当前时间
date_part('hour', timestamp) -> INT             -- 提取时间戳部分

dynamic_table_check()
— Lookup Table Check

dynamic_table_check()
— 查找表检查

Check if a value exists in a dynamic table (async). Used with
dynamic_table
transforms.
sql
WHERE dynamic_table_check('tracked_wallets', from_address)
检查某个值是否存在于动态表中(异步操作),需与
dynamic_table
转换配合使用。
sql
WHERE dynamic_table_check('tracked_wallets', from_address)

String & Encoding Functions

字符串与编码函数

sql
_gs_hex_to_byte(hex_string) -> BINARY
_gs_byte_to_hex(bytes) -> VARCHAR
string_to_array(string, delimiter) -> LIST<VARCHAR>
regexp_extract(string, pattern, group_index) -> VARCHAR
regexp_replace(string, pattern, replacement) -> VARCHAR
sql
_gs_hex_to_byte(hex_string) -> BINARY
_gs_byte_to_hex(bytes) -> VARCHAR
string_to_array(string, delimiter) -> LIST<VARCHAR>
regexp_extract(string, pattern, group_index) -> VARCHAR
regexp_replace(string, pattern, replacement) -> VARCHAR

Standard SQL Functions

标准SQL函数

All Apache DataFusion SQL functions are also available:
lower
,
upper
,
trim
,
substring
,
concat
,
replace
,
reverse
,
COALESCE
,
CASE WHEN
,
CAST
, etc.
sql
-- Common patterns
lower('0xABC...')                              -- case-insensitive address
CONCAT('base', '-', COALESCE(addr, ''))        -- composite keys
COALESCE(balance.contract_address, '')         -- null-safe values
to_timestamp(block_timestamp) AS block_time    -- timestamp conversion

所有Apache DataFusion SQL函数均支持:
lower
upper
trim
substring
concat
replace
reverse
COALESCE
CASE WHEN
CAST
等。
sql
-- 常见用法
lower('0xABC...')                              -- 不区分大小写的地址
CONCAT('base', '-', COALESCE(addr, ''))        -- 复合键
COALESCE(balance.contract_address, '')         -- 空值安全处理
to_timestamp(block_timestamp) AS block_time    -- 时间戳转换

Common Transform Patterns

常见转换模式

1. Table Aliasing

1. 表别名

Use table aliases to reference columns clearly, especially when column names are ambiguous:
sql
SELECT
  balance.token_type,
  balance.owner_address,
  contract_address AS token_address,
  CAST(`balance` AS STRING) AS balance_amount
FROM my_balances_source balance
WHERE balance.token_type = 'ERC_20'
  AND balance.balance IS NOT NULL
Null checks: Use
IS NULL
and
IS NOT NULL
to filter on nullable columns:
sql
WHERE balance.token_type IS NULL    -- native token (no token type)
WHERE balance.balance IS NOT NULL   -- only rows with a balance value
使用表别名清晰引用列名,尤其是列名存在歧义时:
sql
SELECT
  balance.token_type,
  balance.owner_address,
  contract_address AS token_address,
  CAST(`balance` AS STRING) AS balance_amount
FROM my_balances_source balance
WHERE balance.token_type = 'ERC_20'
  AND balance.balance IS NOT NULL
空值检查: 使用
IS NULL
IS NOT NULL
过滤可空列:
sql
WHERE balance.token_type IS NULL    -- 原生代币(无代币类型)
WHERE balance.balance IS NOT NULL   -- 仅保留有余额值的行

2. Simple Filtering (WHERE)

2. 简单过滤(WHERE)

Filter rows from a source by column values:
yaml
transforms:
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM base_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')
按列值过滤数据源中的行:
yaml
transforms:
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM base_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')

3. Column Projection and Aliasing

3. 列投影与别名

Select and rename specific columns:
yaml
transforms:
  clean_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        block_number,
        to_timestamp(block_timestamp) AS block_time,
        address AS token_address,
        sender,
        recipient,
        amount
      FROM my_source
选择并重命名特定列:
yaml
transforms:
  clean_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        block_number,
        to_timestamp(block_timestamp) AS block_time,
        address AS token_address,
        sender,
        recipient,
        amount
      FROM my_source

4. Type Casting and Numeric Scaling

4. 类型转换与数值缩放

Cast values between types. Supported CAST targets include
DOUBLE
,
STRING
,
VARCHAR
,
DECIMAL(p,s)
,
INT
,
BIGINT
.
sql
-- Convert from raw uint256 to human-readable amounts
(CAST(decoded.event_params[3] AS DOUBLE) / 1e6) AS amount_usdc    -- 6 decimals (USDC)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount_eth    -- 18 decimals (ETH, most ERC-20s)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e8) AS amount_btc     -- 8 decimals (WBTC)

-- Cast to STRING to preserve precision for very large numbers (e.g., raw balances)
CAST(`balance` AS STRING) AS balance_amount
Common decimal places by token:
TokenDecimalsDivisor
USDC, USDT6
1e6
WBTC8
1e8
ETH, most ERC-2018
1e18
在不同类型间转换值。支持的CAST目标包括
DOUBLE
STRING
VARCHAR
DECIMAL(p,s)
INT
BIGINT
sql
-- 将原始uint256转换为人类可读的金额
(CAST(decoded.event_params[3] AS DOUBLE) / 1e6) AS amount_usdc    -- 6位小数(USDC)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount_eth    -- 18位小数(ETH、多数ERC-20代币)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e8) AS amount_btc     -- 8位小数(WBTC)

-- 转换为STRING以保留超大数字的精度(如原始余额)
CAST(`balance` AS STRING) AS balance_amount
常见代币小数位数:
代币小数位数除数
USDC、USDT6
1e6
WBTC8
1e8
ETH、多数ERC-2018
1e18

5. Conditional Logic with CASE WHEN

5. CASE WHEN条件逻辑

Derive new columns based on conditions:
sql
-- Determine trade side based on asset type
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side

-- Conditional amount extraction
(CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[6] AS DOUBLE)
  ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / 1e6) AS amount_usdc

-- Classify transaction types
CASE
  WHEN decoded.event_params[3] = '0x4bfb...' THEN 'taker'
  WHEN decoded.event_params[3] = '0xc5d5...' THEN 'taker'
  ELSE 'maker'
END AS order_type
根据条件派生新列:
sql
-- 根据资产类型判断交易方向
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side

-- 条件式金额提取
(CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[6] AS DOUBLE)
  ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / 1e6) AS amount_usdc

-- 分类交易类型
CASE
  WHEN decoded.event_params[3] = '0x4bfb...' THEN 'taker'
  WHEN decoded.event_params[3] = '0xc5d5...' THEN 'taker'
  ELSE 'maker'
END AS order_type

6. String Manipulation

6. 字符串操作

sql
-- Prepend '0x' to hex values from decoded params
'0x' || decoded.event_params[4] AS condition_id

-- Static string values
'TRADE' AS tx_type
'' AS placeholder_field
sql
-- 为解码参数中的十六进制值添加'0x'前缀
'0x' || decoded.event_params[4] AS condition_id

-- 静态字符串值
'TRADE' AS tx_type
'' AS placeholder_field

7. Exclusion Filters

7. 排除过滤

Exclude known contract/system addresses from user-facing data:
sql
WHERE decoded.event_params[1] NOT IN (
  '0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e',
  '0xc5d563a36ae78145c45a50134d48a1215220f80a',
  '0x4d97dcd97ec945f40cf65f87097ace5ea0476045'
)
从面向用户的数据中排除已知的合约/系统地址:
sql
WHERE decoded.event_params[1] NOT IN (
  '0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e',
  '0xc5d563a36ae78145c45a50134d48a1215220f80a',
  '0x4d97dcd97ec945f40cf65f87097ace5ea0476045'
)

8. Price Calculations

8. 价格计算

Derive price from filled amounts:
sql
-- price = cost / quantity
(CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[6] AS DOUBLE)
  ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[7] AS DOUBLE)
  ELSE CAST(decoded.event_params[6] AS DOUBLE)
END) AS price

从成交金额派生价格:
sql
-- 价格 = 成本 / 数量
(CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[6] AS DOUBLE)
  ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / CASE
  WHEN decoded.event_params[4] = '0'
  THEN CAST(decoded.event_params[7] AS DOUBLE)
  ELSE CAST(decoded.event_params[6] AS DOUBLE)
END) AS price

Advanced Patterns

高级模式

Decode Once, Filter Many

一次解码,多次过滤

When working with raw logs from multiple contract events, decode all events in a single transform and then create separate downstream transforms that filter by
decoded.event_signature
. This is more efficient than creating multiple decode transforms.
yaml
transforms:
  # Single decode step — include ALL event ABIs
  all_decoded:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode('[...full ABI with all events...]', topics, `data`) AS `decoded`,
        id, block_number, transaction_hash, address, block_timestamp
      FROM raw_logs_source

  # Downstream: each transform filters for one event type
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, decoded.event_params[1] AS from_addr, ...
      FROM all_decoded
      WHERE decoded.event_signature = 'Transfer'

  approvals:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, decoded.event_params[1] AS owner, ...
      FROM all_decoded
      WHERE decoded.event_signature = 'Approval'
Even if you only need one event type downstream, it's fine to include the full ABI — unmatched events are simply ignored by the downstream WHERE filter.
处理来自多个合约事件的原始日志时,在单个转换步骤中解码所有事件,然后创建多个下游转换,通过
decoded.event_signature
过滤。这比创建多个解码转换更高效。
yaml
transforms:
  # 单个解码步骤 — 包含所有事件ABI
  all_decoded:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode('[...包含所有事件的完整ABI...]', topics, `data`) AS `decoded`,
        id, block_number, transaction_hash, address, block_timestamp
      FROM raw_logs_source

  # 下游:每个转换过滤一种事件类型
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, decoded.event_params[1] AS from_addr, ...
      FROM all_decoded
      WHERE decoded.event_signature = 'Transfer'

  approvals:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, decoded.event_params[1] AS owner, ...
      FROM all_decoded
      WHERE decoded.event_signature = 'Approval'
即使你只需要下游的一种事件类型,也可以包含完整ABI — 不匹配的事件会被下游WHERE过滤器忽略。

Transform Chaining

转换链式调用

Build multi-step pipelines where each transform reads from the previous one:
yaml
transforms:
  # Step 1: Decode raw logs
  decoded:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode('[...]', topics, data) AS decoded,
        id, block_number, transaction_hash, address, block_timestamp
      FROM raw_logs_source

  # Step 2: Extract specific event fields
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS from_address,
        decoded.event_params[2] AS to_address,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
      FROM decoded
      WHERE decoded.event_signature = 'Transfer'

  # Step 3: Add computed columns
  enriched_transfers:
    type: sql
    primary_key: id
    sql: SELECT *, 'ethereum' AS chain FROM transfers
构建多步骤管道,每个转换读取前一个转换的输出:
yaml
transforms:
  # 步骤1:解码原始日志
  decoded:
    type: sql
    primary_key: id
    sql: |
      SELECT
        _gs_log_decode('[...]', topics, data) AS decoded,
        id, block_number, transaction_hash, address, block_timestamp
      FROM raw_logs_source

  # 步骤2:提取特定事件字段
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS from_address,
        decoded.event_params[2] AS to_address,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
      FROM decoded
      WHERE decoded.event_signature = 'Transfer'

  # 步骤3:添加计算列
  enriched_transfers:
    type: sql
    primary_key: id
    sql: SELECT *, 'ethereum' AS chain FROM transfers

UNION ALL — Combining Multiple Event Types

UNION ALL — 合并多种事件类型

Combine multiple transforms with identical schemas into a single output. Every SELECT in the UNION must have the exact same columns in the same order.
yaml
transforms:
  # Individual event transforms (each with identical output columns)
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS user_id,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
        'TRANSFER' AS event_type
      FROM decoded_events
      WHERE decoded.event_signature = 'Transfer'

  approvals:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS user_id,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
        'APPROVAL' AS event_type
      FROM decoded_events
      WHERE decoded.event_signature = 'Approval'

  # Combined output
  all_events:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM transfers
      UNION ALL
      SELECT * FROM approvals
UNION ALL rules:
  • All SELECTs must produce the same number of columns with compatible types
  • Use empty strings (
    ''
    ) or zero (
    0
    ) as placeholders for columns that don't apply to a particular event type
  • UNION ALL
    keeps duplicates (use
    UNION
    to deduplicate, but
    UNION ALL
    is preferred for performance)
  • You can UNION as many transforms as needed
将多个具有相同 schema 的转换输出合并为单个结果。UNION中的每个SELECT必须具有完全相同的列和顺序
yaml
transforms:
  # 单个事件转换(每个都有相同的输出列)
  transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS user_id,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
        'TRANSFER' AS event_type
      FROM decoded_events
      WHERE decoded.event_signature = 'Transfer'

  approvals:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash,
        decoded.event_params[1] AS user_id,
        (CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
        'APPROVAL' AS event_type
      FROM decoded_events
      WHERE decoded.event_signature = 'Approval'

  # 合并后的输出
  all_events:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM transfers
      UNION ALL
      SELECT * FROM approvals
UNION ALL规则:
  • 所有SELECT必须生成相同数量的列,且类型兼容
  • 对特定事件类型不适用的列,使用空字符串(
    ''
    )或0作为占位符
  • UNION ALL
    会保留重复项(如需去重使用
    UNION
    ,但
    UNION ALL
    性能更优)
  • 可以合并任意数量的转换

Normalizing Disparate Events to a Common Schema

将不同事件标准化为通用Schema

When different events have different fields, map them all to a unified schema using placeholder values:
yaml
transforms:
  trades:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash, address,
        decoded.event_params[2] AS user_id,
        decoded.event_params[4] AS asset,
        '' AS condition_id,                    -- not applicable for trades
        (CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
        'TRADE' AS tx_type,
        CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side,
        (CAST(decoded.event_params[8] AS DOUBLE) / 1e6) AS fee
      FROM decoded_events
      WHERE decoded.event_signature = 'OrderFilled'

  redemptions:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash, address,
        decoded.event_params[1] AS user_id,
        '' AS asset,                           -- not applicable for redemptions
        '0x' || decoded.event_params[4] AS condition_id,
        (CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
        'REDEEM' AS tx_type,
        '' AS side,                            -- not applicable for redemptions
        0 AS fee                               -- no fee for redemptions
      FROM decoded_events
      WHERE decoded.event_signature = 'PayoutRedemption'

  all_activities:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM trades
      UNION ALL
      SELECT * FROM redemptions
当不同事件具有不同字段时,使用占位符将它们映射到统一Schema:
yaml
transforms:
  trades:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash, address,
        decoded.event_params[2] AS user_id,
        decoded.event_params[4] AS asset,
        '' AS condition_id,                    -- 交易不适用该字段
        (CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
        'TRADE' AS tx_type,
        CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side,
        (CAST(decoded.event_params[8] AS DOUBLE) / 1e6) AS fee
      FROM decoded_events
      WHERE decoded.event_signature = 'OrderFilled'

  redemptions:
    type: sql
    primary_key: id
    sql: |
      SELECT id, block_number, block_timestamp, transaction_hash, address,
        decoded.event_params[1] AS user_id,
        '' AS asset,                           -- 赎回不适用该字段
        '0x' || decoded.event_params[4] AS condition_id,
        (CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
        'REDEEM' AS tx_type,
        '' AS side,                            -- 赎回不适用该字段
        0 AS fee                               -- 赎回无手续费
      FROM decoded_events
      WHERE decoded.event_signature = 'PayoutRedemption'

  all_activities:
    type: sql
    primary_key: id
    sql: |
      SELECT * FROM trades
      UNION ALL
      SELECT * FROM redemptions

Adding Columns to an Existing Transform

为现有转换添加列

Extend a transform's output without rewriting it:
yaml
  activities_v2:
    type: sql
    primary_key: id
    sql: SELECT *, '' AS builder FROM all_activities

无需重写转换即可扩展其输出:
yaml
  activities_v2:
    type: sql
    primary_key: id
    sql: SELECT *, '' AS builder FROM all_activities

Source-Level Filtering

数据源级过滤

For efficiency, filter data at the source before it reaches transforms. Use the
filter:
field on dataset sources to reduce the volume of data processed:
yaml
sources:
  my_logs:
    type: dataset
    dataset_name: matic.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: >-
      address IN ('0xabc...', '0xdef...')
      AND block_number > 50000000
This is significantly more efficient than filtering in a transform because it reduces data at the ingestion layer.
为提升效率,在数据到达转换步骤前在数据源层过滤数据。在数据集数据源上使用
filter:
字段,减少需要处理的数据量:
yaml
sources:
  my_logs:
    type: dataset
    dataset_name: matic.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: >-
      address IN ('0xabc...', '0xdef...')
      AND block_number > 50000000
这比在转换中过滤高效得多,因为它在数据摄入层就减少了数据量。

Bounded Historical Backfill

有限历史回填

To process historical data starting from a specific block (not genesis), combine
start_at: earliest
with a
block_number
floor in the filter:
yaml
sources:
  poly_logs:
    type: dataset
    dataset_name: matic.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: >-
      address IN ('0xabc...', '0xdef...')
      AND block_number >= 82422949
This processes all data from block 82422949 onward, rather than from genesis or only latest. Useful when:
  • A contract was deployed at a known block
  • You only need data from a certain date forward
  • You want to avoid processing irrelevant ancient blocks
Combine source filtering with transform filtering:
  • Source
    filter:
    → coarse pre-filtering (contract addresses, block ranges)
  • Transform
    WHERE
    → fine-grained filtering (event types, parameter values, exclusions)

如需从特定区块(而非创世区块)开始处理历史数据,将
start_at: earliest
与过滤器中的
block_number
下限结合使用:
yaml
sources:
  poly_logs:
    type: dataset
    dataset_name: matic.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: >-
      address IN ('0xabc...', '0xdef...')
      AND block_number >= 82422949
这会处理从区块82422949开始的所有数据,而非创世区块或仅最新数据。适用于以下场景:
  • 合约在已知区块部署
  • 你只需要特定日期之后的数据
  • 你想避免处理无关的古老区块
结合数据源过滤与转换过滤:
  • 数据源
    filter:
    → 粗粒度预过滤(合约地址、区块范围)
  • 转换
    WHERE
    → 细粒度过滤(事件类型、参数值、排除项)

Debugging Transforms

调试转换逻辑

Common Errors

常见错误

"Unknown source reference" The
FROM
clause references a name that doesn't match any source or transform key in the YAML. Check for typos.
"Missing primary_key" Every transform needs
primary_key
. Almost always use
id
(carried from the source data).
"Column not found" Column name doesn't exist on the source. Use
goldsky turbo inspect <pipeline> -n <source_node>
to see actual column names.
Empty results from decoded events
  • Verify the ABI JSON matches the actual contract events
  • Check
    decoded.event_signature
    matches the event name exactly (case-sensitive)
  • Ensure
    address
    filter matches the correct contract
  • Check
    decoded.event_params[N]
    indexing — parameters are 1-indexed
Type mismatch in UNION ALL All branches of a UNION ALL must have identical column counts and compatible types. Add placeholder columns (
''
,
0
) where needed.
"Unknown source reference"
FROM
子句引用的名称与YAML中的任何数据源或转换键不匹配。检查拼写错误。
"Missing primary_key" 每个转换都需要
primary_key
。几乎总是使用
id
(从数据源继承)。
"Column not found" 列名在数据源中不存在。使用
goldsky turbo inspect <pipeline> -n <source_node>
查看实际列名。
解码事件无结果
  • 验证ABI JSON与实际合约事件匹配
  • 检查
    decoded.event_signature
    与事件名称完全匹配(区分大小写)
  • 确保
    address
    过滤器匹配正确的合约
  • 检查
    decoded.event_params[N]
    索引 — 参数是从1开始索引
UNION ALL类型不匹配 UNION ALL的所有分支必须具有相同的列数和兼容的类型。必要时添加占位符列(
''
0
)。

Inspecting Transform Output

检查转换输出

Use the TUI inspector to see data at any node in the pipeline:
bash
undefined
使用TUI检查器查看管道中任意节点的数据:
bash
undefined

Inspect a specific transform's output

检查特定转换的输出

goldsky turbo inspect <pipeline-name> -n <transform_name>

This helps verify that decoded fields, casts, and filters are producing expected results.

---
goldsky turbo inspect <pipeline-name> -n <transform_name>

这有助于验证解码字段、类型转换和过滤器是否生成了预期结果。

---

Complete Example: Multi-Event Activity Feed

完整示例:多事件活动流

This end-to-end example shows how to build a unified activity feed from raw logs. See the template file
templates/multi-event-activity-feed.yaml
for the full working YAML.
Pattern:
  1. Source raw logs filtered by contract addresses
  2. Decode all events with
    _gs_log_decode()
  3. Create individual transforms per event type, each mapping to a common schema
  4. Combine with
    UNION ALL
  5. Sink to a database

这个端到端示例展示了如何从原始日志构建统一的活动流。完整可用YAML请查看模板文件
templates/multi-event-activity-feed.yaml
模式:
  1. 按合约地址过滤的原始日志数据源
  2. 使用
    _gs_log_decode()
    解码所有事件
  3. 为每种事件类型创建单独的转换,映射到通用Schema
  4. 通过
    UNION ALL
    合并
  5. 输出到数据库

Sink Batching Configuration

输出端批处理配置

Database and streaming sinks support batching to tune latency vs throughput:
yaml
sinks:
  my_sink:
    type: clickhouse
    from: my_transform
    table: my_table
    secret_name: MY_SECRET
    primary_key: id
    batch_size: 1000           # rows per batch
    batch_flush_interval: 300ms # max time before flushing
SettingDescriptionTrade-off
batch_size
Max rows accumulated before flushingHigher = more throughput
batch_flush_interval
Max wait time before flushing a batchLower = lower latency
Guidelines:
  • Latency-sensitive (real-time dashboards):
    batch_flush_interval: 300ms
    ,
    batch_size: 1000
  • Moderate throughput (trade data, events):
    batch_flush_interval: 1000ms
    ,
    batch_size: 1000
  • High-volume streaming (balance snapshots):
    batch_flush_interval: 10s
    ,
    batch_size: 100000

数据库和流式输出端支持批处理配置,以调整延迟与吞吐量的平衡:
yaml
sinks:
  my_sink:
    type: clickhouse
    from: my_transform
    table: my_table
    secret_name: MY_SECRET
    primary_key: id
    batch_size: 1000           # 每批行数
    batch_flush_interval: 300ms # 批处理刷新的最长等待时间
设置描述权衡
batch_size
刷新前累积的最大行数数值越高,吞吐量越大
batch_flush_interval
批处理刷新的最长等待时间数值越低,延迟越低
配置指南:
  • 低延迟敏感(实时仪表盘):
    batch_flush_interval: 300ms
    batch_size: 1000
  • 中等吞吐量(交易数据、事件):
    batch_flush_interval: 1000ms
    batch_size: 1000
  • 高容量流式处理(余额快照):
    batch_flush_interval: 10s
    batch_size: 100000

TypeScript / WASM Script Transforms

TypeScript / WASM脚本转换

For logic SQL can't express: custom parsing, BigInt arithmetic, stateful processing, or complex conditionals. Schema types:
string
,
uint64
,
int64
,
float64
,
boolean
,
bytes
.
Key rules: define
function invoke(data)
, return
null
to filter a record, return an object matching the schema, no async/await or external imports.
See
references/typescript-transforms.md
for full docs, schema field reference, examples, and the when-to-use-script-vs-SQL table. Also includes Handler transforms for HTTP enrichment via external APIs.

用于SQL无法表达的逻辑:自定义解析、BigInt运算、有状态处理或复杂条件判断。Schema类型包括:
string
uint64
int64
float64
boolean
bytes
核心规则:定义
function invoke(data)
,返回
null
以过滤记录,返回与Schema匹配的对象,不支持async/await或外部导入。
完整文档、Schema字段参考、示例以及何时使用脚本vs SQL的对比表,请查看
references/typescript-transforms.md
。还包括通过外部API进行HTTP增强的处理器转换。

Dynamic Table Transforms

动态表转换

Updatable lookup tables for allowlists, blocklists, and join-style enrichment — without redeploying. Backed by either PostgreSQL (durable, externally updatable) or in-memory (fast, ephemeral). Use
dynamic_table_check('table_name', column)
in SQL transforms to filter records.
See
references/dynamic-tables.md
for full config, backend options,
dynamic_table_check()
usage, REST API updates, and a complete wallet-tracking example.

可更新的查找表,用于白名单、黑名单和连接式数据增强 — 无需重新部署。支持PostgreSQL(持久化、可外部更新)或内存(快速、临时)作为后端。在SQL转换中使用
dynamic_table_check('table_name', column)
过滤记录。
完整配置、后端选项、
dynamic_table_check()
用法、REST API更新以及完整钱包跟踪示例,请查看
references/dynamic-tables.md

Solana Transform Patterns

Solana转换模式

For decoding Solana instructions with IDL, program-specific decoders (Token, System, Stake, Vote programs), and SPL token tracking.
See
references/solana-patterns.md
for IDL-based decoding, all built-in program decoders, a full example pipeline, and detailed array/JSON/hex function usage examples.

用于使用IDL解码Solana指令、特定程序解码器(Token、System、Stake、Vote程序)以及SPL代币跟踪。
基于IDL的解码、所有内置程序解码器、完整示例管道以及数组/JSON/十六进制函数的详细用法示例,请查看
references/solana-patterns.md

Related

相关资源

  • /turbo-builder
    — Build and deploy pipelines interactively using these transforms
  • /turbo-doctor
    — Diagnose and fix pipeline issues (including transform errors)
  • /turbo-pipelines
    — Pipeline YAML configuration reference
  • /turbo-architecture
    — Architecture decisions (source types, data flow patterns, resource sizing)
  • /datasets
    — Blockchain dataset and chain prefix reference
  • /turbo-monitor-debug
    — Monitoring and debugging reference
  • /turbo-builder
    — 使用这些转换逻辑交互式构建和部署管道
  • /turbo-doctor
    — 诊断并修复管道问题(包括转换错误)
  • /turbo-pipelines
    — 管道YAML配置参考
  • /turbo-architecture
    — 架构决策(数据源类型、数据流模式、资源 sizing)
  • /datasets
    — 区块链数据集和链前缀参考
  • /turbo-monitor-debug
    — 监控与调试参考