es-ingest
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseES Ingest
ES 数据导入与转换
Stream-based ingestion and transformation of large data files into Elasticsearch. Built on node-es-transformer.
基于流的大型数据文件导入与转换工具,可将数据导入Elasticsearch。基于node-es-transformer构建。
Features
功能特性
- ✅ Stream-based: Handle very large files (20-30 GB tested) without running out of memory
- ✅ High throughput: Up to 20k documents/second on commodity hardware
- ✅ Cross-version: Seamlessly migrate between ES 8.x and 9.x
- ✅ Transformations: Apply custom JavaScript transforms during ingestion
- ✅ Reindexing: Copy and transform existing indices
- ✅ Wildcards: Ingest multiple files matching a pattern (e.g., )
logs/*.json - ✅ Document splitting: Transform one source document into multiple targets
- ✅ 基于流处理:可处理超大型文件(已测试20-30GB),不会出现内存不足问题
- ✅ 高吞吐量:在普通硬件上最高可达每秒20000条文档
- ✅ 跨版本支持:在ES 8.x和9.x之间无缝迁移
- ✅ 数据转换:导入过程中应用自定义JavaScript转换逻辑
- ✅ 重新索引:复制并转换现有索引
- ✅ 通配符支持:导入匹配指定模式的多个文件(例如:)
logs/*.json - ✅ 文档拆分:将单个源文档转换为多个目标文档
Prerequisites
前置要求
- Elasticsearch 8.x or 9.x accessible (local or remote)
- Node.js 22+ installed
- 可访问的Elasticsearch 8.x或9.x(本地或远程)
- 已安装Node.js 22+
Setup
安装配置
Before first use, install dependencies:
bash
cd {baseDir} && npm install首次使用前,安装依赖:
bash
cd {baseDir} && npm installBasic Usage
基础用法
Ingest a JSON file
导入JSON文件
bash
{baseDir}/scripts/ingest.js --file data.json --target my-indexbash
{baseDir}/scripts/ingest.js --file data.json --target my-indexStream NDJSON/CSV via stdin
通过标准输入导入NDJSON/CSV
bash
undefinedbash
undefinedNDJSON
NDJSON格式
cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index
cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index
CSV
CSV格式
cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index
undefinedcat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index
undefinedIngest CSV directly
直接导入CSV文件
bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target usersCSV support requires node-es-transformer >= 1.1.0.
bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target usersCSV支持需要node-es-transformer版本 >= 1.1.0。
Ingest CSV with parser options
带解析选项的CSV导入
bash
undefinedbash
undefinedcsv-options.json
csv-options.json
{
{
"columns": true,
"columns": true,
"delimiter": ";",
"delimiter": ";",
"trim": true
"trim": true
}
}
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users
undefined{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users
undefinedInfer mappings/pipeline from CSV
从CSV自动推断映射/管道
bash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target usersbash
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target usersInfer mappings with options
带选项的映射推断
bash
undefinedbash
undefinedinfer-options.json
infer-options.json
{
{
"sampleBytes": 200000,
"sampleBytes": 200000,
"lines_to_sample": 2000
"lines_to_sample": 2000
}
}
{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefined{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefinedIngest with custom mappings
带自定义映射的导入
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.jsonbash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.jsonIngest with transformation
带转换逻辑的导入
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.jsbash
{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.jsReindex from another index
从其他索引重新索引
bash
{baseDir}/scripts/ingest.js --source-index old-index --target new-indexbash
{baseDir}/scripts/ingest.js --source-index old-index --target new-indexCross-cluster reindex (ES 8.x → 9.x)
跨集群重新索引(ES 8.x → 9.x)
bash
{baseDir}/scripts/ingest.js --source-index logs \
--node https://es8.example.com:9200 --api-key es8-key \
--target new-logs \
--target-node https://es9.example.com:9200 --target-api-key es9-keybash
{baseDir}/scripts/ingest.js --source-index logs \
--node https://es8.example.com:9200 --api-key es8-key \
--target new-logs \
--target-node https://es9.example.com:9200 --target-api-key es9-keyCommand Reference
命令参考
Required Options
必填选项
bash
--target <index> # Target index namebash
--target <index> # 目标索引名称Source Options (choose one)
源数据选项(选择其一)
bash
--file <path> # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name> # Source Elasticsearch index
--stdin # Read NDJSON/CSV from stdinbash
--file <path> # 源文件(支持通配符,例如:logs/*.json)
--source-index <name> # 源Elasticsearch索引
--stdin # 从标准输入读取NDJSON/CSVElasticsearch Connection
Elasticsearch连接配置
bash
--node <url> # ES node URL (default: http://localhost:9200)
--api-key <key> # API key authentication
--username <user> # Basic auth username
--password <pass> # Basic auth passwordbash
--node <url> # ES节点URL(默认:http://localhost:9200)
--api-key <key> # API密钥认证
--username <user> # 基础认证用户名
--password <pass> # 基础认证密码Target Connection (for cross-cluster)
目标集群连接配置(跨集群场景)
bash
--target-node <url> # Target ES node URL (uses --node if not specified)
--target-api-key <key> # Target API key
--target-username <user> # Target username
--target-password <pass> # Target passwordbash
--target-node <url> # 目标ES节点URL(未指定时使用--node的值)
--target-api-key <key> # 目标集群API密钥
--target-username <user> # 目标集群用户名
--target-password <pass> # 目标集群密码Index Configuration
索引配置
bash
--mappings <file.json> # Mappings file (auto-copy from source if reindexing)
--infer-mappings # Infer mappings/pipeline from file/stream
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index # Delete target index if exists
--pipeline <name> # Ingest pipeline namebash
--mappings <file.json> # 映射文件(重新索引时自动从源索引复制)
--infer-mappings # 从文件/流自动推断映射/管道
--infer-mappings-options <file> # 映射推断选项(JSON文件)
--delete-index # 如果目标索引存在则删除
--pipeline <name> # 导入管道名称Processing
处理配置
bash
--transform <file.js> # Transform function (export as default or module.exports)
--query <file.json> # Query file to filter source documents
--source-format <fmt> # Source format: ndjson|csv (default: ndjson)
--csv-options <file> # CSV parser options (JSON file)
--skip-header # Skip first line (e.g., CSV header)bash
--transform <file.js> # 转换函数(默认导出或module.exports导出)
--query <file.json> # 用于过滤源文档的查询文件
--source-format <fmt> # 源数据格式:ndjson|csv(默认:ndjson)
--csv-options <file> # CSV解析选项(JSON文件)
--skip-header # 跳过第一行(例如CSV表头)Performance
性能配置
bash
--buffer-size <kb> # Buffer size in KB (default: 5120)
--search-size <n> # Docs per search when reindexing (default: 100)
--total-docs <n> # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode> # Progress output: auto|line|newline (default: auto)
--debug-events # Log pause/resume/stall events
--quiet # Disable progress barsbash
--buffer-size <kb> # 缓冲区大小(KB,默认:5120)
--search-size <n> # 重新索引时每次搜索返回的文档数(默认:100)
--total-docs <n> # 进度条显示的总文档数(文件/流场景)
--stall-warn-seconds <n> # 停滞警告阈值(秒,默认:30)
--progress-mode <mode> # 进度输出模式:auto|line|newline(默认:auto)
--debug-events # 记录暂停/恢复/停滞事件
--quiet # 禁用进度条Transform Functions
转换函数
Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:
转换函数可让你在导入过程中修改文档。创建一个JavaScript文件并导出转换函数:
Basic Transform (transform.js)
基础转换(transform.js)
javascript
// ES modules (default)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// Or CommonJS
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};javascript
// ES模块(默认)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// 或CommonJS格式
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};Skip Documents
跳过文档
Return or to skip a document:
nullundefinedjavascript
export default function transform(doc) {
// Skip invalid documents
if (!doc.email || !doc.email.includes('@')) {
return null;
}
return doc;
}返回或即可跳过该文档:
nullundefinedjavascript
export default function transform(doc) {
// 跳过无效文档
if (!doc.email || !doc.email.includes('@')) {
return null;
}
return doc;
}Split Documents
拆分文档
Return an array to create multiple target documents from one source:
javascript
export default function transform(doc) {
// Split a tweet into multiple hashtag documents
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map(tag => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}返回数组即可将单个源文档拆分为多个目标文档:
javascript
export default function transform(doc) {
// 将一条推文拆分为多个话题标签文档
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map(tag => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}Mappings
映射配置
Auto-Copy Mappings (Reindexing)
自动复制映射(重新索引场景)
When reindexing, mappings are automatically copied from the source index:
bash
{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs重新索引时,映射会自动从源索引复制:
bash
{baseDir}/scripts/ingest.js --source-index old-logs --target new-logsCustom Mappings (mappings.json)
自定义映射(mappings.json)
json
{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.jsonjson
{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.jsonQuery Filters
查询过滤
Filter source documents during reindexing with a query file:
重新索引时可通过查询文件过滤源文档:
Query File (filter.json)
查询文件(filter.json)
json
{
"range": {
"@timestamp": {
"gte": "2024-01-01",
"lt": "2024-02-01"
}
}
}bash
{baseDir}/scripts/ingest.js \
--source-index logs \
--target filtered-logs \
--query filter.jsonjson
{
"range": {
"@timestamp": {
"gte": "2024-01-01",
"lt": "2024-02-01"
}
}
}bash
{baseDir}/scripts/ingest.js \
--source-index logs \
--target filtered-logs \
--query filter.jsonCommon Patterns
常见使用场景
Pattern 1: Load CSV with custom mappings
场景1:导入带自定义映射的CSV文件
bash
undefinedbash
undefined1. Create mappings.json with your schema
1. 创建包含你的 schema 的 mappings.json
cat > mappings.json << 'EOF'
{
"properties": {
"timestamp": { "type": "date" },
"user_id": { "type": "keyword" },
"action": { "type": "keyword" },
"value": { "type": "double" }
}
}
EOF
cat > mappings.json << 'EOF'
{
"properties": {
"timestamp": { "type": "date" },
"user_id": { "type": "keyword" },
"action": { "type": "keyword" },
"value": { "type": "double" }
}
}
EOF
2. Ingest CSV (skip header row)
2. 导入CSV文件(跳过表头行)
{baseDir}/scripts/ingest.js
--file events.csv
--target events
--mappings mappings.json
--skip-header
--file events.csv
--target events
--mappings mappings.json
--skip-header
undefined{baseDir}/scripts/ingest.js
--file events.csv
--target events
--mappings mappings.json
--skip-header
--file events.csv
--target events
--mappings mappings.json
--skip-header
undefinedPattern 2: Migrate ES 8.x → 9.x with transforms
场景2:带转换的ES 8.x → 9.x迁移
bash
undefinedbash
undefined1. Create transform.js to update document structure
1. 创建transform.js以更新文档结构
cat > transform.js << 'EOF'
export default function transform(doc) {
// Update field names for ES 9.x compatibility
return {
...doc,
'@timestamp': doc.timestamp, // Rename field
user: {
id: doc.user_id,
name: doc.user_name,
},
};
}
EOF
cat > transform.js << 'EOF'
export default function transform(doc) {
// 更新字段名称以兼容ES 9.x
return {
...doc,
'@timestamp': doc.timestamp, // 重命名字段
user: {
id: doc.user_id,
name: doc.user_name,
},
};
}
EOF
2. Migrate with transform
2. 带转换的迁移
{baseDir}/scripts/ingest.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
undefined{baseDir}/scripts/ingest.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
--source-index logs
--node https://es8-cluster.example.com:9200
--api-key $ES8_API_KEY
--target logs-v2
--target-node https://es9-cluster.example.com:9200
--target-api-key $ES9_API_KEY
--transform transform.js
undefinedPattern 3: Reindex with filtering and deletion
场景3:带过滤和删除的重新索引
bash
undefinedbash
undefined1. Create filter query
1. 创建过滤查询
cat > filter.json << 'EOF'
{
"bool": {
"must": [
{ "range": { "@timestamp": { "gte": "now-30d" } } }
],
"must_not": [
{ "term": { "status": "deleted" } }
]
}
}
EOF
cat > filter.json << 'EOF'
{
"bool": {
"must": [
{ "range": { "@timestamp": { "gte": "now-30d" } } }
],
"must_not": [
{ "term": { "status": "deleted" } }
]
}
}
EOF
2. Reindex with filter (delete old index first)
2. 带过滤的重新索引(先删除旧索引)
{baseDir}/scripts/ingest.js
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
undefined{baseDir}/scripts/ingest.js
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
--source-index logs-raw
--target logs-filtered
--query filter.json
--delete-index
undefinedPattern 4: Batch ingest multiple files
场景4:批量导入多个文件
bash
undefinedbash
undefinedIngest all JSON files in a directory
导入目录下所有JSON文件
{baseDir}/scripts/ingest.js
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
undefined{baseDir}/scripts/ingest.js
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
--file "logs/*.json"
--target combined-logs
--mappings mappings.json
undefinedPattern 5: Document enrichment during ingestion
场景5:导入过程中文档增强
bash
undefinedbash
undefined1. Create enrichment transform
1. 创建增强转换函数
cat > enrich.js << 'EOF'
export default function transform(doc) {
return {
...doc,
enriched_at: new Date().toISOString(),
source: 'batch-import',
year: new Date(doc.timestamp).getFullYear(),
};
}
EOF
cat > enrich.js << 'EOF'
export default function transform(doc) {
return {
...doc,
enriched_at: new Date().toISOString(),
source: 'batch-import',
year: new Date(doc.timestamp).getFullYear(),
};
}
EOF
2. Ingest with enrichment
2. 带增强的导入
{baseDir}/scripts/ingest.js
--file data.json
--target enriched-data
--transform enrich.js
--file data.json
--target enriched-data
--transform enrich.js
undefined{baseDir}/scripts/ingest.js
--file data.json
--target enriched-data
--transform enrich.js
--file data.json
--target enriched-data
--transform enrich.js
undefinedPerformance Tuning
性能调优
For Large Files (>5GB)
处理大型文件(>5GB)
bash
undefinedbash
undefinedIncrease buffer size for better throughput
增大缓冲区大小以提升吞吐量
{baseDir}/scripts/ingest.js
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB buffer
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB buffer
undefined{baseDir}/scripts/ingest.js
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB缓冲区
--file huge-file.json
--target my-index
--buffer-size 10240 # 10 MB缓冲区
undefinedFor Slow Networks
慢速网络场景
bash
undefinedbash
undefinedReduce batch size to avoid timeouts
减小批量大小以避免超时
{baseDir}/scripts/ingest.js
--source-index remote-logs
--target local-logs
--search-size 50 # Smaller batches
--source-index remote-logs
--target local-logs
--search-size 50 # Smaller batches
undefined{baseDir}/scripts/ingest.js
--source-index remote-logs
--target local-logs
--search-size 50 # 更小的批次
--source-index remote-logs
--target local-logs
--search-size 50 # 更小的批次
undefinedQuiet Mode (for scripts)
静默模式(脚本场景)
bash
undefinedbash
undefinedDisable progress bars for automated scripts
自动化脚本中禁用进度条
{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--quiet
--file data.json
--target my-index
--quiet
undefined{baseDir}/scripts/ingest.js
--file data.json
--target my-index
--quiet
--file data.json
--target my-index
--quiet
undefinedWhen to Use
适用场景
Use this skill when you need to:
- Load large files into Elasticsearch without memory issues
- Migrate indices between ES versions (8.x ↔ 9.x)
- Transform data during ingestion (enrich, split, filter)
- Reindex with modifications (rename fields, restructure documents)
- Batch process multiple files matching a pattern
- Cross-cluster replication with transformations
当你需要以下操作时使用本工具:
- 加载大型文件到Elasticsearch,避免内存问题
- 迁移索引在不同ES版本之间(8.x ↔ 9.x)
- 导入时转换数据(增强、拆分、过滤)
- 带修改的重新索引(重命名字段、重构文档)
- 批量处理匹配指定模式的多个文件
- 带转换的跨集群复制
When NOT to Use
不适用场景
Consider alternatives for:
- Real-time ingestion: Use Filebeat or Elastic Agent
- Enterprise pipelines: Use Logstash
- Built-in transforms: Use Elasticsearch Transforms
- Small files (<100MB): Direct Elasticsearch API calls may be simpler
以下场景请考虑替代方案:
- 实时导入:使用Filebeat或Elastic Agent
- 企业级管道:使用Logstash
- 内置转换:使用Elasticsearch Transforms
- 小型文件(<100MB):直接使用Elasticsearch API可能更简单
Troubleshooting
故障排查
Connection refused
连接被拒绝
Elasticsearch is not running or URL is incorrect:
bash
undefinedElasticsearch未运行或URL不正确:
bash
undefinedTest connection
测试连接
Or with auth
或带认证的测试
curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200
undefinedcurl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200
undefinedOut of memory errors
内存不足错误
Reduce buffer size:
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048减小缓冲区大小:
bash
{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048Transform function not loading
转换函数加载失败
Ensure the transform file exports correctly:
javascript
// ✓ Correct (ES modules)
export default function transform(doc) { /* ... */ }
// ✓ Correct (CommonJS)
module.exports = function transform(doc) { /* ... */ }
// ✗ Wrong
function transform(doc) { /* ... */ }确保转换文件正确导出:
javascript
// ✓ 正确格式(ES模块)
export default function transform(doc) { /* ... */ }
// ✓ 正确格式(CommonJS)
module.exports = function transform(doc) { /* ... */ }
// ✗ 错误格式
function transform(doc) { /* ... */ }Mapping conflicts
映射冲突
Delete and recreate the index:
bash
{baseDir}/scripts/ingest.js \
--file data.json \
--target my-index \
--mappings mappings.json \
--delete-index删除并重新创建索引:
bash
{baseDir}/scripts/ingest.js \
--file data.json \
--target my-index \
--mappings mappings.json \
--delete-index