elasticsearch-file-ingest

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Elasticsearch File Ingest

Elasticsearch 文件导入

Stream-based ingestion and transformation of large data files (NDJSON, CSV, Parquet, Arrow IPC) into Elasticsearch.
基于流的大文件数据(NDJSON、CSV、Parquet、Arrow IPC)导入与转换工具,可将数据导入Elasticsearch。

Features & Use Cases

功能与适用场景

  • Stream-based: Handle large files without running out of memory
  • High throughput: 50k+ documents/second on commodity hardware
  • Cross-version: Seamlessly migrate between ES 8.x and 9.x, or replicate across clusters
  • Formats: NDJSON, CSV, Parquet, Arrow IPC
  • Transformations: Apply custom JavaScript transforms during ingestion (enrich, split, filter)
  • Reindexing: Copy and transform existing indices (rename fields, restructure documents)
  • Batch processing: Ingest multiple files matching a pattern (e.g.,
    logs/*.json
    )
  • Document splitting: Transform one source document into multiple targets
  • 基于流处理:无需担心内存不足即可处理大文件
  • 高吞吐量:在普通硬件上可达到每秒5万+文档的处理速度
  • 跨版本支持:可在ES 8.x与9.x之间无缝迁移,或跨集群复制数据
  • 支持格式:NDJSON、CSV、Parquet、Arrow IPC
  • 自定义转换:导入过程中可应用自定义JavaScript转换(数据增强、拆分、过滤)
  • 重索引:复制并转换现有索引(重命名字段、重构文档结构)
  • 批量处理:导入符合指定模式的多个文件(例如:
    logs/*.json
  • 文档拆分:将单个源文档转换为多个目标文档

Prerequisites

前置要求

  • Elasticsearch 8.x or 9.x accessible (local or remote)
  • Node.js 22+ installed
  • Elasticsearch 8.x 或 9.x 可访问(本地或远程)
  • 已安装 Node.js 22+

Setup

安装配置

This skill is self-contained. The
scripts/
folder and
package.json
live in this skill's directory. Run all commands from this directory. Use absolute paths when referencing data files located elsewhere.
Before first use, install dependencies:
bash
npm install
本工具为独立包,
scripts/
文件夹和
package.json
位于工具目录下。所有命令请在该目录下执行。引用外部数据文件时请使用绝对路径。
首次使用前,请安装依赖:
bash
npm install

Environment Configuration

环境配置

Elasticsearch connection is configured via environment variables. The CLI flags
--node
,
--api-key
,
--username
, and
--password
override environment variables when provided.
Elasticsearch连接通过环境变量配置。提供
--node
--api-key
--username
--password
这些CLI参数时,会覆盖环境变量的设置。

Option 1: Elastic Cloud (recommended for production)

选项1:Elastic Cloud(生产环境推荐)

bash
export ELASTICSEARCH_CLOUD_ID="deployment-name:base64encodedcloudid"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
bash
export ELASTICSEARCH_CLOUD_ID="deployment-name:base64encodedcloudid"
export ELASTICSEARCH_API_KEY="base64encodedapikey"

Option 2: Direct URL with API Key

选项2:通过API Key连接直接URL

bash
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_API_KEY="base64encodedapikey"
bash
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_API_KEY="base64encodedapikey"

Option 3: Basic Authentication

选项3:基础认证

bash
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="changeme"
bash
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="elastic"
export ELASTICSEARCH_PASSWORD="changeme"

Option 4: Local Development with start-local

选项4:使用start-local进行本地开发

For local development and testing, use start-local to quickly spin up Elasticsearch and Kibana using Docker or Podman:
bash
curl -fsSL https://elastic.co/start-local | sh
After installation completes, source the generated
.env
file:
bash
source elastic-start-local/.env
export ELASTICSEARCH_URL="$ES_LOCAL_URL"
export ELASTICSEARCH_API_KEY="$ES_LOCAL_API_KEY"
本地开发和测试时,可使用start-local通过Docker或Podman快速启动Elasticsearch和Kibana:
bash
curl -fsSL https://elastic.co/start-local | sh
安装完成后,加载生成的
.env
文件:
bash
source elastic-start-local/.env
export ELASTICSEARCH_URL="$ES_LOCAL_URL"
export ELASTICSEARCH_API_KEY="$ES_LOCAL_API_KEY"

Optional: Skip TLS verification (development only)

可选:跳过TLS验证(仅开发环境)

bash
export ELASTICSEARCH_INSECURE="true"
bash
export ELASTICSEARCH_INSECURE="true"

Examples

使用示例

Ingest a JSON file

导入JSON文件

bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index
bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index

Stream NDJSON/CSV via stdin

通过标准输入流导入NDJSON/CSV

bash
undefined
bash
undefined

NDJSON

NDJSON格式

cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index
cat /absolute/path/to/data.ndjson | node scripts/ingest.js --stdin --target my-index

CSV

CSV格式

cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index
undefined
cat /absolute/path/to/data.csv | node scripts/ingest.js --stdin --source-format csv --target my-index
undefined

Ingest CSV directly

直接导入CSV文件

bash
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users
bash
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --target users

Ingest Parquet directly

直接导入Parquet文件

bash
node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users
bash
node scripts/ingest.js --file /absolute/path/to/users.parquet --source-format parquet --target users

Ingest Arrow IPC directly

直接导入Arrow IPC文件

bash
node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users
bash
node scripts/ingest.js --file /absolute/path/to/users.arrow --source-format arrow --target users

Ingest CSV with parser options

带解析选项的CSV导入

bash
undefined
bash
undefined

csv-options.json

csv-options.json

{

{

"columns": true,

"columns": true,

"delimiter": ";",

"delimiter": ";",

"trim": true

"trim": true

}

}

node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users
undefined
node scripts/ingest.js --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users
undefined

Infer mappings/pipeline from CSV

从CSV自动推断映射/管道

When using
--infer-mappings
, do not combine it with
--source-format csv
. Inference sends a raw sample to Elasticsearch's
_text_structure/find_structure
endpoint, which returns both mappings and an ingest pipeline with a CSV processor. If
--source-format csv
is also set, CSV is parsed client-side and server-side, resulting in an empty index. Let
--infer-mappings
handle everything:
bash
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users
使用
--infer-mappings
时,请勿同时设置
--source-format csv
。自动推断功能会将原始样本发送至Elasticsearch的
_text_structure/find_structure
接口,该接口会返回映射和包含CSV处理器的导入管道。如果同时设置
--source-format csv
,会在客户端和服务端同时解析CSV,导致索引为空。请仅使用
--infer-mappings
来完成所有操作:
bash
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --target users

Infer mappings with options

带选项的映射推断

bash
undefined
bash
undefined

infer-options.json

infer-options.json

{

{

"sampleBytes": 200000,

"sampleBytes": 200000,

"lines_to_sample": 2000

"lines_to_sample": 2000

}

}

node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefined
node scripts/ingest.js --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users
undefined

Ingest with custom mappings

带自定义映射的导入

bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json

Ingest with transformation

带转换逻辑的导入

bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js
bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --transform transform.js

Reindex from another index

从其他索引重索引

bash
node scripts/ingest.js --source-index old-index --target new-index
bash
node scripts/ingest.js --source-index old-index --target new-index

Cross-cluster reindex (ES 8.x → 9.x)

跨集群重索引(ES 8.x → 9.x)

bash
node scripts/ingest.js --source-index logs \
  --node https://es8.example.com:9200 --api-key es8-key \
  --target new-logs \
  --target-node https://es9.example.com:9200 --target-api-key es9-key
bash
node scripts/ingest.js --source-index logs \
  --node https://es8.example.com:9200 --api-key es8-key \
  --target new-logs \
  --target-node https://es9.example.com:9200 --target-api-key es9-key

Command Reference

命令参考

Required Options

必填选项

bash
--target <index>         # Target index name
bash
--target <index>         # 目标索引名称

Source Options (choose one)

源数据选项(三选一)

bash
--file <path>            # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name>    # Source Elasticsearch index
--stdin                  # Read NDJSON/CSV from stdin
bash
--file <path>            # 源文件(支持通配符,例如:logs/*.json)
--source-index <name>    # 源Elasticsearch索引
--stdin                  # 从标准输入读取NDJSON/CSV数据

Elasticsearch Connection

Elasticsearch连接配置

bash
--node <url>             # ES node URL (default: http://localhost:9200)
--api-key <key>          # API key authentication
--username <user>        # Basic auth username
--password <pass>        # Basic auth password
bash
--node <url>             # ES节点URL(默认:http://localhost:9200)
--api-key <key>          # API Key认证
--username <user>        # 基础认证用户名
--password <pass>        # 基础认证密码

Target Connection (for cross-cluster)

目标集群连接配置(跨集群场景)

bash
--target-node <url>      # Target ES node URL (uses --node if not specified)
--target-api-key <key>   # Target API key
--target-username <user> # Target username
--target-password <pass> # Target password
bash
--target-node <url>      # 目标ES节点URL(未指定时使用--node的值)
--target-api-key <key>   # 目标集群API Key
--target-username <user> # 目标集群用户名
--target-password <pass> # 目标集群密码

Index Configuration

索引配置

bash
--mappings <file.json>          # Mappings file (auto-copy from source if reindexing)
--infer-mappings                # Infer mappings/pipeline from file/stream (do NOT combine with --source-format)
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index                  # Delete target index if exists
--pipeline <name>               # Ingest pipeline name
bash
--mappings <file.json>          # 映射文件(重索引时会自动从源索引复制)
--infer-mappings                # 从文件/流自动推断映射/管道(请勿与--source-format同时使用)
--infer-mappings-options <file> # 自动推断的配置选项(JSON文件)
--delete-index                  # 如果目标索引存在则删除
--pipeline <name>               # 导入管道名称

Processing

处理配置

bash
--transform <file.js>    # Transform function (export as default or module.exports)
--query <file.json>      # Query file to filter source documents
--source-format <fmt>    # Source format: ndjson|csv|parquet|arrow (default: ndjson)
--csv-options <file>     # CSV parser options (JSON file)
--skip-header            # Skip first line (e.g., CSV header)
bash
--transform <file.js>    # 转换函数文件(需导出default或module.exports)
--query <file.json>      # 用于过滤源文档的查询文件
--source-format <fmt>    # 源数据格式:ndjson|csv|parquet|arrow(默认:ndjson)
--csv-options <file>     # CSV解析器配置选项(JSON文件)
--skip-header            # 跳过第一行(例如CSV表头)

Performance

性能配置

bash
--buffer-size <kb>       # Buffer size in KB (default: 5120)
--search-size <n>        # Docs per search when reindexing (default: 100)
--total-docs <n>         # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode>   # Progress output: auto|line|newline (default: auto)
--debug-events           # Log pause/resume/stall events
--quiet                  # Disable progress bars
bash
--buffer-size <kb>       # 缓冲区大小(KB,默认:5120)
--search-size <n>        # 重索引时每次搜索返回的文档数(默认:100)
--total-docs <n>         # 进度条显示的总文档数(文件/流场景)
--stall-warn-seconds <n> # 停滞警告阈值(秒,默认:30)
--progress-mode <mode>   # 进度输出模式:auto|line|newline(默认:auto)
--debug-events           # 记录暂停/恢复/停滞事件
--quiet                  # 禁用进度条

Transform Functions

转换函数

Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:
转换函数可让你在导入过程中修改文档。创建一个JavaScript文件并导出转换函数:

Basic Transform (transform.js)

基础转换示例(transform.js)

javascript
// ES modules (default)
export default function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
    timestamp: new Date().toISOString(),
  };
}

// Or CommonJS
module.exports = function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
  };
};
javascript
// ES模块(默认方式)
export default function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
    timestamp: new Date().toISOString(),
  };
}

// 或CommonJS格式
module.exports = function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
  };
};

Skip Documents

跳过文档

Return
null
or
undefined
to skip a document:
javascript
export default function transform(doc) {
  // Skip invalid documents
  if (!doc.email || !doc.email.includes("@")) {
    return null;
  }
  return doc;
}
返回
null
undefined
即可跳过当前文档:
javascript
export default function transform(doc) {
  // 跳过无效文档
  if (!doc.email || !doc.email.includes("@")) {
    return null;
  }
  return doc;
}

Split Documents

拆分文档

Return an array to create multiple target documents from one source:
javascript
export default function transform(doc) {
  // Split a tweet into multiple hashtag documents
  const hashtags = doc.text.match(/#\w+/g) || [];
  return hashtags.map((tag) => ({
    hashtag: tag,
    tweet_id: doc.id,
    created_at: doc.created_at,
  }));
}
返回数组即可将单个源文档拆分为多个目标文档:
javascript
export default function transform(doc) {
  // 将一条推文拆分为多个话题标签文档
  const hashtags = doc.text.match(/#\w+/g) || [];
  return hashtags.map((tag) => ({
    hashtag: tag,
    tweet_id: doc.id,
    created_at: doc.created_at,
  }));
}

Mappings

映射配置

Auto-Copy Mappings (Reindexing)

重索引时自动复制映射

When reindexing, mappings are automatically copied from the source index:
bash
node scripts/ingest.js --source-index old-logs --target new-logs
重索引场景下,映射会自动从源索引复制:
bash
node scripts/ingest.js --source-index old-logs --target new-logs

Custom Mappings (mappings.json)

自定义映射(mappings.json)

json
{
  "properties": {
    "@timestamp": { "type": "date" },
    "message": { "type": "text" },
    "user": {
      "properties": {
        "name": { "type": "keyword" },
        "email": { "type": "keyword" }
      }
    }
  }
}
bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json
json
{
  "properties": {
    "@timestamp": { "type": "date" },
    "message": { "type": "text" },
    "user": {
      "properties": {
        "name": { "type": "keyword" },
        "email": { "type": "keyword" }
      }
    }
  }
}
bash
node scripts/ingest.js --file /absolute/path/to/data.json --target my-index --mappings mappings.json

Query Filters

查询过滤

Filter source documents during reindexing with a query file:
重索引时可通过查询文件过滤源文档:

Query File (filter.json)

查询文件示例(filter.json)

json
{
  "range": {
    "@timestamp": {
      "gte": "2024-01-01",
      "lt": "2024-02-01"
    }
  }
}
bash
node scripts/ingest.js \
  --source-index logs \
  --target filtered-logs \
  --query filter.json
json
{
  "range": {
    "@timestamp": {
      "gte": "2024-01-01",
      "lt": "2024-02-01"
    }
  }
}
bash
node scripts/ingest.js \
  --source-index logs \
  --target filtered-logs \
  --query filter.json

Boundaries

注意事项

  • Never run destructive commands (such as using the
    --delete-index
    flag or deleting existing indices and data) without explicit user confirmation.
  • 切勿在未获得用户明确确认的情况下执行破坏性命令(例如使用
    --delete-index
    参数或删除现有索引及数据)。

Guidelines

使用指南

  • Never combine
    --infer-mappings
    with
    --source-format
    . Inference creates a server-side ingest pipeline that handles parsing (e.g., CSV processor). Using
    --source-format csv
    parses client-side as well, causing double-parsing and an empty index. Use
    --infer-mappings
    alone for automatic detection, or
    --source-format
    with explicit
    --mappings
    for manual control.
  • Use
    --source-format csv
    with
    --mappings
    when you want client-side CSV parsing with known field types.
  • Use
    --infer-mappings
    alone
    when you want Elasticsearch to detect the format, infer field types, and create an ingest pipeline automatically.
  • 切勿同时使用
    --infer-mappings
    --source-format
    。自动推断功能会创建一个服务端导入管道来处理解析(例如CSV处理器)。使用
    --source-format csv
    会在客户端同时解析,导致双重解析并生成空索引。如需自动检测,请单独使用
    --infer-mappings
    ;如需手动控制,请结合
    --source-format
    和显式的
    --mappings
    参数。
  • 当需要客户端CSV解析且已知字段类型时,请结合使用
    --source-format csv
    --mappings
  • 当需要Elasticsearch自动检测格式、推断字段类型并创建导入管道时,请单独使用
    --infer-mappings

When NOT to Use

不适用场景

Consider alternatives for:
以下场景请考虑替代方案:

Additional Resources

额外资源

  • Common Patterns - Detailed examples for CSV loading, migrations, filtering, and more
  • Troubleshooting - Solutions for common issues
  • 常见模式 - CSV加载、迁移、过滤等场景的详细示例
  • 故障排查 - 常见问题的解决方案

References

参考链接