Loading...
Loading...
Ingest and transform large data files (CSV/JSON) into Elasticsearch indices. Stream-based processing for files up to 30GB, cross-version migration (ES 8.x ↔ 9.x), custom JavaScript transformations, and reindexing with transforms. Use when you need to load data into Elasticsearch, migrate indices, or transform data during ingestion.
npx skill4agent add walterra/agent-tools es-ingestlogs/*.jsoncd {baseDir} && npm install{baseDir}/scripts/ingest.js --file data.json --target my-index# NDJSON
cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index
# CSV
cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target usersCSV support requires node-es-transformer >= 1.1.0.
# csv-options.json
# {
# "columns": true,
# "delimiter": ";",
# "trim": true
# }
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target users# infer-options.json
# {
# "sampleBytes": 200000,
# "lines_to_sample": 2000
# }
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.js{baseDir}/scripts/ingest.js --source-index old-index --target new-index{baseDir}/scripts/ingest.js --source-index logs \
--node https://es8.example.com:9200 --api-key es8-key \
--target new-logs \
--target-node https://es9.example.com:9200 --target-api-key es9-key--target <index> # Target index name--file <path> # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name> # Source Elasticsearch index
--stdin # Read NDJSON/CSV from stdin--node <url> # ES node URL (default: http://localhost:9200)
--api-key <key> # API key authentication
--username <user> # Basic auth username
--password <pass> # Basic auth password--target-node <url> # Target ES node URL (uses --node if not specified)
--target-api-key <key> # Target API key
--target-username <user> # Target username
--target-password <pass> # Target password--mappings <file.json> # Mappings file (auto-copy from source if reindexing)
--infer-mappings # Infer mappings/pipeline from file/stream
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index # Delete target index if exists
--pipeline <name> # Ingest pipeline name--transform <file.js> # Transform function (export as default or module.exports)
--query <file.json> # Query file to filter source documents
--source-format <fmt> # Source format: ndjson|csv (default: ndjson)
--csv-options <file> # CSV parser options (JSON file)
--skip-header # Skip first line (e.g., CSV header)--buffer-size <kb> # Buffer size in KB (default: 5120)
--search-size <n> # Docs per search when reindexing (default: 100)
--total-docs <n> # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode> # Progress output: auto|line|newline (default: auto)
--debug-events # Log pause/resume/stall events
--quiet # Disable progress bars// ES modules (default)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// Or CommonJS
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};nullundefinedexport default function transform(doc) {
// Skip invalid documents
if (!doc.email || !doc.email.includes('@')) {
return null;
}
return doc;
}export default function transform(doc) {
// Split a tweet into multiple hashtag documents
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map(tag => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json{
"range": {
"@timestamp": {
"gte": "2024-01-01",
"lt": "2024-02-01"
}
}
}{baseDir}/scripts/ingest.js \
--source-index logs \
--target filtered-logs \
--query filter.json# 1. Create mappings.json with your schema
cat > mappings.json << 'EOF'
{
"properties": {
"timestamp": { "type": "date" },
"user_id": { "type": "keyword" },
"action": { "type": "keyword" },
"value": { "type": "double" }
}
}
EOF
# 2. Ingest CSV (skip header row)
{baseDir}/scripts/ingest.js \
--file events.csv \
--target events \
--mappings mappings.json \
--skip-header# 1. Create transform.js to update document structure
cat > transform.js << 'EOF'
export default function transform(doc) {
// Update field names for ES 9.x compatibility
return {
...doc,
'@timestamp': doc.timestamp, // Rename field
user: {
id: doc.user_id,
name: doc.user_name,
},
};
}
EOF
# 2. Migrate with transform
{baseDir}/scripts/ingest.js \
--source-index logs \
--node https://es8-cluster.example.com:9200 \
--api-key $ES8_API_KEY \
--target logs-v2 \
--target-node https://es9-cluster.example.com:9200 \
--target-api-key $ES9_API_KEY \
--transform transform.js# 1. Create filter query
cat > filter.json << 'EOF'
{
"bool": {
"must": [
{ "range": { "@timestamp": { "gte": "now-30d" } } }
],
"must_not": [
{ "term": { "status": "deleted" } }
]
}
}
EOF
# 2. Reindex with filter (delete old index first)
{baseDir}/scripts/ingest.js \
--source-index logs-raw \
--target logs-filtered \
--query filter.json \
--delete-index# Ingest all JSON files in a directory
{baseDir}/scripts/ingest.js \
--file "logs/*.json" \
--target combined-logs \
--mappings mappings.json# 1. Create enrichment transform
cat > enrich.js << 'EOF'
export default function transform(doc) {
return {
...doc,
enriched_at: new Date().toISOString(),
source: 'batch-import',
year: new Date(doc.timestamp).getFullYear(),
};
}
EOF
# 2. Ingest with enrichment
{baseDir}/scripts/ingest.js \
--file data.json \
--target enriched-data \
--transform enrich.js# Increase buffer size for better throughput
{baseDir}/scripts/ingest.js \
--file huge-file.json \
--target my-index \
--buffer-size 10240 # 10 MB buffer# Reduce batch size to avoid timeouts
{baseDir}/scripts/ingest.js \
--source-index remote-logs \
--target local-logs \
--search-size 50 # Smaller batches# Disable progress bars for automated scripts
{baseDir}/scripts/ingest.js \
--file data.json \
--target my-index \
--quiet# Test connection
curl http://localhost:9200
# Or with auth
curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048// ✓ Correct (ES modules)
export default function transform(doc) { /* ... */ }
// ✓ Correct (CommonJS)
module.exports = function transform(doc) { /* ... */ }
// ✗ Wrong
function transform(doc) { /* ... */ }{baseDir}/scripts/ingest.js \
--file data.json \
--target my-index \
--mappings mappings.json \
--delete-index