es-ingest

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ES Ingest

ES 数据导入与转换

Stream-based ingestion and transformation of large data files into Elasticsearch. Built on node-es-transformer.
基于流的大型数据文件导入与转换工具,可将数据导入Elasticsearch。基于node-es-transformer构建。

Features

功能特性

  • Stream-based: Handle very large files (20-30 GB tested) without running out of memory
  • High throughput: Up to 20k documents/second on commodity hardware
  • Cross-version: Seamlessly migrate between ES 8.x and 9.x
  • Transformations: Apply custom JavaScript transforms during ingestion
  • Reindexing: Copy and transform existing indices
  • Wildcards: Ingest multiple files matching a pattern (e.g.,
    logs/*.json
    )
  • Document splitting: Transform one source document into multiple targets
  • 基于流处理:可处理超大型文件(已测试20-30GB),不会出现内存不足问题
  • 高吞吐量:在普通硬件上最高可达每秒20000条文档
  • 跨版本支持:在ES 8.x和9.x之间无缝迁移
  • 数据转换:导入过程中应用自定义JavaScript转换逻辑
  • 重新索引:复制并转换现有索引
  • 通配符支持:导入匹配指定模式的多个文件(例如:
    logs/*.json
  • 文档拆分:将单个源文档转换为多个目标文档

Prerequisites

前置要求

  • Elasticsearch 8.x or 9.x accessible (local or remote)
  • Node.js 22+ installed
  • 可访问的Elasticsearch 8.x或9.x(本地或远程)
  • 已安装Node.js 22+

Setup

安装配置

Before first use, install dependencies:
bash
cd {baseDir} && npm install
首次使用前,安装依赖:
bash
cd {baseDir} && npm install

Basic Usage

基础用法

Ingest a JSON file

导入JSON文件

bash
{baseDir}/scripts/ingest.js --file data.json --target my-index
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index

Stream NDJSON/CSV via stdin

通过标准输入导入NDJSON/CSV

bash
undefined
bash
undefined

NDJSON

NDJSON格式

cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index
cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index

CSV

CSV格式

cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index
undefined
cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index
undefined

Ingest CSV directly

直接导入CSV文件

bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target users
CSV support requires node-es-transformer >= 1.1.0.
bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target users
CSV支持需要node-es-transformer版本 >= 1.1.0。

Ingest CSV with parser options

带解析选项的CSV导入

bash
undefined
bash
undefined

csv-options.json

csv-options.json

{

{

"columns": true,

"columns": true,

"delimiter": ";",

"delimiter": ";",

"trim": true

"trim": true

}

}

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users
undefined
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users
undefined

Infer mappings/pipeline from CSV

从CSV自动推断映射/管道

bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target users
bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target users

Infer mappings with options

带选项的映射推断

bash
undefined
bash
undefined

infer-options.json

infer-options.json

{

{

"sampleBytes": 200000,

"sampleBytes": 200000,

"lines_to_sample": 2000

"lines_to_sample": 2000

}

}

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefined
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefined

Ingest with custom mappings

带自定义映射的导入

bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Ingest with transformation

带转换逻辑的导入

bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.js
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.js

Reindex from another index

从其他索引重新索引

bash
{baseDir}/scripts/ingest.js --source-index old-index --target new-index
bash
{baseDir}/scripts/ingest.js --source-index old-index --target new-index

Cross-cluster reindex (ES 8.x → 9.x)

跨集群重新索引(ES 8.x → 9.x)

bash
{baseDir}/scripts/ingest.js --source-index logs \
  --node https://es8.example.com:9200 --api-key es8-key \
  --target new-logs \
  --target-node https://es9.example.com:9200 --target-api-key es9-key
bash
{baseDir}/scripts/ingest.js --source-index logs \
  --node https://es8.example.com:9200 --api-key es8-key \
  --target new-logs \
  --target-node https://es9.example.com:9200 --target-api-key es9-key

Command Reference

命令参考

Required Options

必填选项

bash
--target <index>         # Target index name
bash
--target <index>         # 目标索引名称

Source Options (choose one)

源数据选项(选择其一)

bash
--file <path>            # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name>    # Source Elasticsearch index
--stdin                  # Read NDJSON/CSV from stdin
bash
--file <path>            # 源文件(支持通配符,例如:logs/*.json)
--source-index <name>    # 源Elasticsearch索引
--stdin                  # 从标准输入读取NDJSON/CSV

Elasticsearch Connection

Elasticsearch连接配置

bash
--node <url>             # ES node URL (default: http://localhost:9200)
--api-key <key>          # API key authentication
--username <user>        # Basic auth username
--password <pass>        # Basic auth password
bash
--node <url>             # ES节点URL(默认:http://localhost:9200)
--api-key <key>          # API密钥认证
--username <user>        # 基础认证用户名
--password <pass>        # 基础认证密码

Target Connection (for cross-cluster)

目标集群连接配置(跨集群场景)

bash
--target-node <url>      # Target ES node URL (uses --node if not specified)
--target-api-key <key>   # Target API key
--target-username <user> # Target username
--target-password <pass> # Target password
bash
--target-node <url>      # 目标ES节点URL(未指定时使用--node的值)
--target-api-key <key>   # 目标集群API密钥
--target-username <user> # 目标集群用户名
--target-password <pass> # 目标集群密码

Index Configuration

索引配置

bash
--mappings <file.json>          # Mappings file (auto-copy from source if reindexing)
--infer-mappings                # Infer mappings/pipeline from file/stream
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index                  # Delete target index if exists
--pipeline <name>               # Ingest pipeline name
bash
--mappings <file.json>          # 映射文件(重新索引时自动从源索引复制)
--infer-mappings                # 从文件/流自动推断映射/管道
--infer-mappings-options <file> # 映射推断选项(JSON文件)
--delete-index                  # 如果目标索引存在则删除
--pipeline <name>               # 导入管道名称

Processing

处理配置

bash
--transform <file.js>    # Transform function (export as default or module.exports)
--query <file.json>      # Query file to filter source documents
--source-format <fmt>    # Source format: ndjson|csv (default: ndjson)
--csv-options <file>     # CSV parser options (JSON file)
--skip-header            # Skip first line (e.g., CSV header)
bash
--transform <file.js>    # 转换函数(默认导出或module.exports导出)
--query <file.json>      # 用于过滤源文档的查询文件
--source-format <fmt>    # 源数据格式:ndjson|csv(默认:ndjson)
--csv-options <file>     # CSV解析选项(JSON文件)
--skip-header            # 跳过第一行(例如CSV表头)

Performance

性能配置

bash
--buffer-size <kb>       # Buffer size in KB (default: 5120)
--search-size <n>        # Docs per search when reindexing (default: 100)
--total-docs <n>         # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode>   # Progress output: auto|line|newline (default: auto)
--debug-events           # Log pause/resume/stall events
--quiet                  # Disable progress bars
bash
--buffer-size <kb>       # 缓冲区大小(KB,默认:5120)
--search-size <n>        # 重新索引时每次搜索返回的文档数(默认:100)
--total-docs <n>         # 进度条显示的总文档数(文件/流场景)
--stall-warn-seconds <n> # 停滞警告阈值(秒,默认:30)
--progress-mode <mode>   # 进度输出模式:auto|line|newline(默认:auto)
--debug-events           # 记录暂停/恢复/停滞事件
--quiet                  # 禁用进度条

Transform Functions

转换函数

Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:
转换函数可让你在导入过程中修改文档。创建一个JavaScript文件并导出转换函数:

Basic Transform (transform.js)

基础转换(transform.js)

javascript
// ES modules (default)
export default function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
    timestamp: new Date().toISOString(),
  };
}

// Or CommonJS
module.exports = function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
  };
};
javascript
// ES模块(默认)
export default function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
    timestamp: new Date().toISOString(),
  };
}

// 或CommonJS格式
module.exports = function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
  };
};

Skip Documents

跳过文档

Return
null
or
undefined
to skip a document:
javascript
export default function transform(doc) {
  // Skip invalid documents
  if (!doc.email || !doc.email.includes('@')) {
    return null;
  }
  return doc;
}
返回
null
undefined
即可跳过该文档:
javascript
export default function transform(doc) {
  // 跳过无效文档
  if (!doc.email || !doc.email.includes('@')) {
    return null;
  }
  return doc;
}

Split Documents

拆分文档

Return an array to create multiple target documents from one source:
javascript
export default function transform(doc) {
  // Split a tweet into multiple hashtag documents
  const hashtags = doc.text.match(/#\w+/g) || [];
  return hashtags.map(tag => ({
    hashtag: tag,
    tweet_id: doc.id,
    created_at: doc.created_at,
  }));
}
返回数组即可将单个源文档拆分为多个目标文档:
javascript
export default function transform(doc) {
  // 将一条推文拆分为多个话题标签文档
  const hashtags = doc.text.match(/#\w+/g) || [];
  return hashtags.map(tag => ({
    hashtag: tag,
    tweet_id: doc.id,
    created_at: doc.created_at,
  }));
}

Mappings

映射配置

Auto-Copy Mappings (Reindexing)

自动复制映射(重新索引场景)

When reindexing, mappings are automatically copied from the source index:
bash
{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs
重新索引时,映射会自动从源索引复制:
bash
{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs

Custom Mappings (mappings.json)

自定义映射(mappings.json)

json
{
  "properties": {
    "@timestamp": { "type": "date" },
    "message": { "type": "text" },
    "user": {
      "properties": {
        "name": { "type": "keyword" },
        "email": { "type": "keyword" }
      }
    }
  }
}
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json
json
{
  "properties": {
    "@timestamp": { "type": "date" },
    "message": { "type": "text" },
    "user": {
      "properties": {
        "name": { "type": "keyword" },
        "email": { "type": "keyword" }
      }
    }
  }
}
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Query Filters

查询过滤

Filter source documents during reindexing with a query file:
重新索引时可通过查询文件过滤源文档:

Query File (filter.json)

查询文件(filter.json)

json
{
  "range": {
    "@timestamp": {
      "gte": "2024-01-01",
      "lt": "2024-02-01"
    }
  }
}
bash
{baseDir}/scripts/ingest.js \
  --source-index logs \
  --target filtered-logs \
  --query filter.json
json
{
  "range": {
    "@timestamp": {
      "gte": "2024-01-01",
      "lt": "2024-02-01"
    }
  }
}
bash
{baseDir}/scripts/ingest.js \
  --source-index logs \
  --target filtered-logs \
  --query filter.json

Common Patterns

常见使用场景

Pattern 1: Load CSV with custom mappings

场景1:导入带自定义映射的CSV文件

bash
undefined
bash
undefined

1. Create mappings.json with your schema

1. 创建包含你的 schema 的 mappings.json

cat > mappings.json << 'EOF' { "properties": { "timestamp": { "type": "date" }, "user_id": { "type": "keyword" }, "action": { "type": "keyword" }, "value": { "type": "double" } } } EOF
cat > mappings.json << 'EOF' { "properties": { "timestamp": { "type": "date" }, "user_id": { "type": "keyword" }, "action": { "type": "keyword" }, "value": { "type": "double" } } } EOF

2. Ingest CSV (skip header row)

2. 导入CSV文件(跳过表头行)

{baseDir}/scripts/ingest.js
--file events.csv
--target events
--mappings mappings.json
--skip-header
undefined
{baseDir}/scripts/ingest.js
--file events.csv
--target events
--mappings mappings.json
--skip-header
undefined

Pattern 2: Migrate ES 8.x → 9.x with transforms

场景2:带转换的ES 8.x → 9.x迁移

bash
undefined
bash
undefined

1. Create transform.js to update document structure

1. 创建transform.js以更新文档结构

cat > transform.js << 'EOF' export default function transform(doc) { // Update field names for ES 9.x compatibility return { ...doc, '@timestamp': doc.timestamp, // Rename field user: { id: doc.user_id, name: doc.user_name, }, }; } EOF
cat > transform.js << 'EOF' export default function transform(doc) { // 更新字段名称以兼容ES 9.x return { ...doc, '@timestamp': doc.timestamp, // 重命名字段 user: { id: doc.user_id, name: doc.user_name, }, }; } EOF

2. Migrate with transform

2. 带转换的迁移

{baseDir}/scripts/ingest.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
undefined
{baseDir}/scripts/ingest.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
undefined

Pattern 3: Reindex with filtering and deletion

场景3:带过滤和删除的重新索引

bash
undefined
bash
undefined

1. Create filter query

1. 创建过滤查询

cat > filter.json << 'EOF' { "bool": { "must": [ { "range": { "@timestamp": { "gte": "now-30d" } } } ], "must_not": [ { "term": { "status": "deleted" } } ] } } EOF
cat > filter.json << 'EOF' { "bool": { "must": [ { "range": { "@timestamp": { "gte": "now-30d" } } } ], "must_not": [ { "term": { "status": "deleted" } } ] } } EOF

2. Reindex with filter (delete old index first)

2. 带过滤的重新索引(先删除旧索引)

{baseDir}/scripts/ingest.js
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
undefined
{baseDir}/scripts/ingest.js
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
undefined

Pattern 4: Batch ingest multiple files

场景4:批量导入多个文件

bash
undefined
bash
undefined

Ingest all JSON files in a directory

导入目录下所有JSON文件

{baseDir}/scripts/ingest.js
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
undefined
{baseDir}/scripts/ingest.js
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
undefined

Pattern 5: Document enrichment during ingestion

场景5:导入过程中文档增强

bash
undefined
bash
undefined

1. Create enrichment transform

1. 创建增强转换函数

cat > enrich.js << 'EOF' export default function transform(doc) { return { ...doc, enriched_at: new Date().toISOString(), source: 'batch-import', year: new Date(doc.timestamp).getFullYear(), }; } EOF
cat > enrich.js << 'EOF' export default function transform(doc) { return { ...doc, enriched_at: new Date().toISOString(), source: 'batch-import', year: new Date(doc.timestamp).getFullYear(), }; } EOF

2. Ingest with enrichment

2. 带增强的导入

{baseDir}/scripts/ingest.js
--file data.json
--target enriched-data
--transform enrich.js
undefined
{baseDir}/scripts/ingest.js
--file data.json
--target enriched-data
--transform enrich.js
undefined

Performance Tuning

性能调优

For Large Files (>5GB)

处理大型文件(>5GB)

bash
undefined
bash
undefined

Increase buffer size for better throughput

增大缓冲区大小以提升吞吐量

{baseDir}/scripts/ingest.js
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB buffer
undefined
{baseDir}/scripts/ingest.js
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB缓冲区
undefined

For Slow Networks

慢速网络场景

bash
undefined
bash
undefined

Reduce batch size to avoid timeouts

减小批量大小以避免超时

{baseDir}/scripts/ingest.js
--source-index remote-logs
--target local-logs
--search-size 50 # Smaller batches
undefined
{baseDir}/scripts/ingest.js
--source-index remote-logs
--target local-logs
--search-size 50 # 更小的批次
undefined

Quiet Mode (for scripts)

静默模式(脚本场景)

bash
undefined
bash
undefined

Disable progress bars for automated scripts

自动化脚本中禁用进度条

{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--quiet
undefined
{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--quiet
undefined

When to Use

适用场景

Use this skill when you need to:
  • Load large files into Elasticsearch without memory issues
  • Migrate indices between ES versions (8.x ↔ 9.x)
  • Transform data during ingestion (enrich, split, filter)
  • Reindex with modifications (rename fields, restructure documents)
  • Batch process multiple files matching a pattern
  • Cross-cluster replication with transformations
当你需要以下操作时使用本工具:
  • 加载大型文件到Elasticsearch,避免内存问题
  • 迁移索引在不同ES版本之间(8.x ↔ 9.x)
  • 导入时转换数据(增强、拆分、过滤)
  • 带修改的重新索引(重命名字段、重构文档)
  • 批量处理匹配指定模式的多个文件
  • 带转换的跨集群复制

When NOT to Use

不适用场景

Consider alternatives for:
以下场景请考虑替代方案:

Troubleshooting

故障排查

Connection refused

连接被拒绝

Elasticsearch is not running or URL is incorrect:
bash
undefined
Elasticsearch未运行或URL不正确:
bash
undefined

Test connection

测试连接

Or with auth

或带认证的测试

curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200
undefined
curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200
undefined

Out of memory errors

内存不足错误

Reduce buffer size:
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048
减小缓冲区大小:
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048

Transform function not loading

转换函数加载失败

Ensure the transform file exports correctly:
javascript
// ✓ Correct (ES modules)
export default function transform(doc) { /* ... */ }

// ✓ Correct (CommonJS)
module.exports = function transform(doc) { /* ... */ }

// ✗ Wrong
function transform(doc) { /* ... */ }
确保转换文件正确导出:
javascript
// ✓ 正确格式(ES模块)
export default function transform(doc) { /* ... */ }

// ✓ 正确格式(CommonJS)
module.exports = function transform(doc) { /* ... */ }

// ✗ 错误格式
function transform(doc) { /* ... */ }

Mapping conflicts

映射冲突

Delete and recreate the index:
bash
{baseDir}/scripts/ingest.js \
  --file data.json \
  --target my-index \
  --mappings mappings.json \
  --delete-index
删除并重新创建索引:
bash
{baseDir}/scripts/ingest.js \
  --file data.json \
  --target my-index \
  --mappings mappings.json \
  --delete-index

References

参考资料