Loading...
Loading...
Write SQL, TypeScript, and dynamic table transforms for Goldsky Turbo pipelines. Use this skill for: decoding EVM event logs with _gs_log_decode (requires ABI) or transaction inputs with _gs_tx_decode, filtering and casting blockchain data in SQL, combining multiple decoded event types into one table with UNION ALL, writing TypeScript/WASM transforms using the invoke(data) function signature, setting up dynamic lookup tables to filter transfers by a wallet list you update at runtime (dynamic_table_check), chaining SQL and TypeScript steps together, or debugging null values in decoded fields. For full pipeline YAML structure, use /turbo-pipelines instead. For building an entire pipeline end-to-end, use /turbo-builder instead.
npx skill4agent add goldsky-io/goldsky-agent turbo-transformsgoldsky turbo validate <pipeline.yaml>goldsky turbo validatereferences/typescript-transforms.mdreferences/dynamic-tables.mdreferences/solana-patterns.mdtransforms:
my_transform:
type: sql
primary_key: id
sql: |
SELECT id, block_number, address
FROM my_source
WHERE address = '0xabc...'| Field | Required | Description |
|---|---|---|
| Yes | |
| Yes | Column used for uniqueness and ordering |
| Yes | SQL query (for |
FROM my_sourceFROM my_transformfromFROMdynamic_tablepostgres_aggregate_gs_op_gs_op'i''u''d'evm_log_decode()Aliases:and_gs_log_decodealso work. Existing pipelines usingevm_decode_logare valid._gs_log_decode
evm_log_decode(abi_json, topics, data) -> STRUCT<name: VARCHAR, event_params: LIST<VARCHAR>>abi_jsontopicstopicsraw_logsdatadataraw_logsdecoded.namedecoded.event_signature'OrderFilled''Transfer'decoded.event_params[N]transforms:
decoded_events:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode(
'[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]',
topics,
data
) AS decoded,
id,
block_number,
transaction_hash,
address,
block_timestamp
FROM my_raw_logs transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
block_timestamp,
transaction_hash,
address AS token_address,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'_gs_log_decode()>-|filter:`data``decoded``balance``owner_address``id`SELECT
_gs_log_decode('[...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM my_raw_logsfetch_abi()fetch_abi(url, format) -> VARCHAR
-- format: 'raw' for plain JSON, 'etherscan' for Etherscan API responses_gs_fetch_abievm_log_decode(
fetch_abi('https://example.com/erc20.json', 'raw'),
topics, data
) AS decoded_gs_keccak256()keccak2560x_gs_keccak256('Transfer(address,address,uint256)')
-- 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3efxxhash()xxhash(concat(transaction_hash, '_', log_index::VARCHAR)) AS unique_id-- Convert to/from U256
to_u256(value) -> U256
u256_to_string(u256_value) -> VARCHAR
-- Arithmetic (also available: u256_sub, u256_mul, u256_div, u256_mod)
u256_add(a, b) -> U256
-- Automatic operator rewriting: once values are cast to U256/I256,
-- standard operators (+, -, *, /, %) are auto-rewritten to function calls
to_u256(value) / to_u256('1000000000000000000') -- same as u256_div(...)to_i256i256_to_stringi256_addi256_subi256_muli256_divi256_modi256_negi256_absu256_to_string(
to_u256(evt.event_params[3]) / to_u256('1000000000000000000')
) AS amount_eth-- Decode instruction data using IDL (returns STRUCT<name, value>)
_gs_decode_instruction_data(idl_json, data)
-- Decode program log messages using IDL
_gs_decode_log_message(idl_json, log_messages)
-- Program-specific decoders:
gs_solana_decode_token_program_instruction(data)
gs_solana_decode_system_program_instruction(data)
gs_solana_get_accounts(transaction_data)
gs_solana_get_balance_changes(transaction_data)
gs_solana_decode_associated_token_program_instruction(data)
gs_solana_decode_stake_program_instruction(data)
gs_solana_decode_vote_program_instruction(data)
-- Base58 decoding
_gs_from_base58(base58_string) -> BINARY-- Filter array elements by struct field matching a list of values
-- (prevents overflow panics by filtering BEFORE unnest)
array_filter_in(array, field_name, values_list) -> LIST
-- Convert list to large-list (i64 offsets) for very large arrays
to_large_list(array) -> LARGE_LIST
-- Filter array by field value
array_filter(array, field_name, value) -> LIST
-- Get first matching element
array_filter_first(array, field_name, value) -> STRUCT
-- Add index to each element
array_enumerate(array) -> LIST<STRUCT<index, value>>
-- Combine multiple arrays element-wise
zip_arrays(array1, array2, ...) -> LIST<STRUCT>json_query(json_string, path) -> VARCHAR -- Query JSON by path
json_value(json_string, path) -> VARCHAR -- Extract scalar value
json_exists(json_string, path) -> BOOLEAN -- Check path exists
is_json(string) -> BOOLEAN -- Validate JSON
parse_json(json_string) -> JSON -- Parse (errors on invalid)
try_parse_json(json_string) -> JSON -- Parse (NULL on invalid)
json_object(key1, val1, ...) -> JSON -- Construct JSON object
json_array(val1, val2, ...) -> JSON -- Construct JSON arrayto_timestamp(seconds) -> TIMESTAMP
to_timestamp_micros(microseconds) -> TIMESTAMP
now() -> TIMESTAMP -- volatile, current time
date_part('hour', timestamp) -> INT -- extract timestamp partsdynamic_table_check()dynamic_tableWHERE dynamic_table_check('tracked_wallets', from_address)_gs_hex_to_byte(hex_string) -> BINARY
_gs_byte_to_hex(bytes) -> VARCHAR
string_to_array(string, delimiter) -> LIST<VARCHAR>
regexp_extract(string, pattern, group_index) -> VARCHAR
regexp_replace(string, pattern, replacement) -> VARCHARloweruppertrimsubstringconcatreplacereverseCOALESCECASE WHENCAST-- Common patterns
lower('0xABC...') -- case-insensitive address
CONCAT('base', '-', COALESCE(addr, '')) -- composite keys
COALESCE(balance.contract_address, '') -- null-safe values
to_timestamp(block_timestamp) AS block_time -- timestamp conversionSELECT
balance.token_type,
balance.owner_address,
contract_address AS token_address,
CAST(`balance` AS STRING) AS balance_amount
FROM my_balances_source balance
WHERE balance.token_type = 'ERC_20'
AND balance.balance IS NOT NULLIS NULLIS NOT NULLWHERE balance.token_type IS NULL -- native token (no token type)
WHERE balance.balance IS NOT NULL -- only rows with a balance valuetransforms:
usdc_transfers:
type: sql
primary_key: id
sql: |
SELECT *
FROM base_transfers
WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')transforms:
clean_transfers:
type: sql
primary_key: id
sql: |
SELECT
id,
block_number,
to_timestamp(block_timestamp) AS block_time,
address AS token_address,
sender,
recipient,
amount
FROM my_sourceDOUBLESTRINGVARCHARDECIMAL(p,s)INTBIGINT-- Convert from raw uint256 to human-readable amounts
(CAST(decoded.event_params[3] AS DOUBLE) / 1e6) AS amount_usdc -- 6 decimals (USDC)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount_eth -- 18 decimals (ETH, most ERC-20s)
(CAST(decoded.event_params[3] AS DOUBLE) / 1e8) AS amount_btc -- 8 decimals (WBTC)
-- Cast to STRING to preserve precision for very large numbers (e.g., raw balances)
CAST(`balance` AS STRING) AS balance_amount| Token | Decimals | Divisor |
|---|---|---|
| USDC, USDT | 6 | |
| WBTC | 8 | |
| ETH, most ERC-20 | 18 | |
-- Determine trade side based on asset type
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side
-- Conditional amount extraction
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / 1e6) AS amount_usdc
-- Classify transaction types
CASE
WHEN decoded.event_params[3] = '0x4bfb...' THEN 'taker'
WHEN decoded.event_params[3] = '0xc5d5...' THEN 'taker'
ELSE 'maker'
END AS order_type-- Prepend '0x' to hex values from decoded params
'0x' || decoded.event_params[4] AS condition_id
-- Static string values
'TRADE' AS tx_type
'' AS placeholder_fieldWHERE decoded.event_params[1] NOT IN (
'0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e',
'0xc5d563a36ae78145c45a50134d48a1215220f80a',
'0x4d97dcd97ec945f40cf65f87097ace5ea0476045'
)-- price = cost / quantity
(CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[6] AS DOUBLE)
ELSE CAST(decoded.event_params[7] AS DOUBLE)
END / CASE
WHEN decoded.event_params[4] = '0'
THEN CAST(decoded.event_params[7] AS DOUBLE)
ELSE CAST(decoded.event_params[6] AS DOUBLE)
END) AS pricedecoded.event_signaturetransforms:
# Single decode step — include ALL event ABIs
all_decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...full ABI with all events...]', topics, `data`) AS `decoded`,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# Downstream: each transform filters for one event type
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS from_addr, ...
FROM all_decoded
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, decoded.event_params[1] AS owner, ...
FROM all_decoded
WHERE decoded.event_signature = 'Approval'transforms:
# Step 1: Decode raw logs
decoded:
type: sql
primary_key: id
sql: |
SELECT
_gs_log_decode('[...]', topics, data) AS decoded,
id, block_number, transaction_hash, address, block_timestamp
FROM raw_logs_source
# Step 2: Extract specific event fields
transfers:
type: sql
primary_key: id
sql: |
SELECT
id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS from_address,
decoded.event_params[2] AS to_address,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount
FROM decoded
WHERE decoded.event_signature = 'Transfer'
# Step 3: Add computed columns
enriched_transfers:
type: sql
primary_key: id
sql: SELECT *, 'ethereum' AS chain FROM transferstransforms:
# Individual event transforms (each with identical output columns)
transfers:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'TRANSFER' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Transfer'
approvals:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash,
decoded.event_params[1] AS user_id,
(CAST(decoded.event_params[3] AS DOUBLE) / 1e18) AS amount,
'APPROVAL' AS event_type
FROM decoded_events
WHERE decoded.event_signature = 'Approval'
# Combined output
all_events:
type: sql
primary_key: id
sql: |
SELECT * FROM transfers
UNION ALL
SELECT * FROM approvals''0UNION ALLUNIONUNION ALLtransforms:
trades:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[2] AS user_id,
decoded.event_params[4] AS asset,
'' AS condition_id, -- not applicable for trades
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'TRADE' AS tx_type,
CASE WHEN decoded.event_params[4] = '0' THEN 'BUY' ELSE 'SELL' END AS side,
(CAST(decoded.event_params[8] AS DOUBLE) / 1e6) AS fee
FROM decoded_events
WHERE decoded.event_signature = 'OrderFilled'
redemptions:
type: sql
primary_key: id
sql: |
SELECT id, block_number, block_timestamp, transaction_hash, address,
decoded.event_params[1] AS user_id,
'' AS asset, -- not applicable for redemptions
'0x' || decoded.event_params[4] AS condition_id,
(CAST(decoded.event_params[6] AS DOUBLE) / 1e6) AS amount_usdc,
'REDEEM' AS tx_type,
'' AS side, -- not applicable for redemptions
0 AS fee -- no fee for redemptions
FROM decoded_events
WHERE decoded.event_signature = 'PayoutRedemption'
all_activities:
type: sql
primary_key: id
sql: |
SELECT * FROM trades
UNION ALL
SELECT * FROM redemptions activities_v2:
type: sql
primary_key: id
sql: SELECT *, '' AS builder FROM all_activitiesfilter:sources:
my_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number > 50000000start_at: earliestblock_numbersources:
poly_logs:
type: dataset
dataset_name: matic.raw_logs
version: 1.0.0
start_at: earliest
filter: >-
address IN ('0xabc...', '0xdef...')
AND block_number >= 82422949filter:WHEREFROMprimary_keyidgoldsky turbo inspect <pipeline> -n <source_node>decoded.event_signatureaddressdecoded.event_params[N]''0# Inspect a specific transform's output
goldsky turbo inspect <pipeline-name> -n <transform_name>templates/multi-event-activity-feed.yaml_gs_log_decode()UNION ALLsinks:
my_sink:
type: clickhouse
from: my_transform
table: my_table
secret_name: MY_SECRET
primary_key: id
batch_size: 1000 # rows per batch
batch_flush_interval: 300ms # max time before flushing| Setting | Description | Trade-off |
|---|---|---|
| Max rows accumulated before flushing | Higher = more throughput |
| Max wait time before flushing a batch | Lower = lower latency |
batch_flush_interval: 300msbatch_size: 1000batch_flush_interval: 1000msbatch_size: 1000batch_flush_interval: 10sbatch_size: 100000stringuint64int64float64booleanbytesfunction invoke(data)nullreferences/typescript-transforms.mddynamic_table_check('table_name', column)references/dynamic-tables.mddynamic_table_check()references/solana-patterns.md/turbo-builder/turbo-doctor/turbo-pipelines/turbo-architecture/datasets/turbo-monitor-debug