ingest-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chineseingest-pipelines
ingest-pipelines
Skill authority
技能权威规范
The rules and patterns defined in this skill and its reference files are the authoritative source of truth. When examining existing integrations in the repository for reference, you may encounter patterns that conflict with what is specified here — many integrations contain legacy patterns that predate current standards. Always follow this skill over patterns observed in other integrations. If a reference integration uses a deprecated or prohibited pattern, do not copy it.
elastic/integrations本技能及其参考文件中定义的规则和模式是权威的事实来源。在仓库中查看现有集成作为参考时,你可能会遇到与本文档指定内容冲突的模式——许多集成包含早于当前标准的遗留模式。始终遵循本技能中的规范,而非其他集成中观察到的模式。如果某个参考集成使用了已弃用或被禁止的模式,请勿复制。
elastic/integrationsWhen to use
使用场景
Use this skill when tasks include:
- building or modifying for a data stream
elasticsearch/ingest_pipeline/default.yml - choosing parser and normalization processors (,
grok,dissect,json,kv,date)convert - designing conditional branches and sub-pipeline routing with processors
pipeline - implementing resilient error handling with top-level
on_failure - tuning processor order for ingest performance and maintainability
当你需要完成以下任务时,使用本技能:
- 为数据流构建或修改
elasticsearch/ingest_pipeline/default.yml - 选择解析和标准化处理器(、
grok、dissect、json、kv、date)convert - 使用处理器设计条件分支和子管道路由
pipeline - 实现顶层的弹性错误处理
on_failure - 调整处理器顺序以优化摄入性能和可维护性
When not to use
非适用场景
Do not use this skill as the primary guide for:
- ECS field selection, categorization values, and field mapping strategy ()
ecs-field-mappings - elastic-package command and stack lifecycle workflows ()
elastic-package-cli - test fixture authoring and expected output workflows (→
integration-testing)references/pipeline-testing.md
请勿将本技能作为以下内容的主要指南:
- ECS字段选择、分类值和字段映射策略()
ecs-field-mappings - elastic-package命令和堆栈生命周期工作流()
elastic-package-cli - 测试 fixture 编写和预期输出工作流(→
integration-testing)references/pipeline-testing.md
Pipeline anatomy
管道结构
In integration packages, ingest pipelines live under:
data_stream/<stream>/elasticsearch/ingest_pipeline/Every stream usually has a with:
default.ymldescription- list
processors - optional pipeline-level
on_failure
Keep readable and focused. Move large format-specific logic into sub-pipelines where needed.
default.yml在集成包中,ingest pipeline位于:
data_stream/<stream>/elasticsearch/ingest_pipeline/每个数据流通常有一个,包含:
default.yml- (描述)
description - 列表(处理器列表)
processors - 可选的管道级(失败处理)
on_failure
保持的可读性和聚焦性。必要时将大型格式特定逻辑移至子管道中。
default.ymlECS version
ECS版本
Set the pipeline ECS reference version explicitly at the top of (after any introductory processors you already use). Use — do not pin an older ECS version.
processors9.3.0yaml
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'在顶部(在你已使用的任何介绍性处理器之后)显式设置管道ECS参考版本。请使用——不要固定旧版本的ECS。
processors9.3.0yaml
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'Rename vs set (mapping to ECS)
Rename与Set对比(映射到ECS)
When moving a value from a custom or vendor field into an ECS field, prefer the processor so the source field is removed and you avoid duplicate data. Use with only when you must keep the source field or when is not applicable.
renamesetcopy_fromrename当将值从自定义或厂商字段移动到ECS字段时,优先使用处理器,这样源字段会被移除,避免重复数据。仅当必须保留源字段或不适用时,才使用带有的。
renamerenamecopy_fromsetProcessor tags
处理器标签
Every processor in the pipeline should have a (not only processors that can fail). Tags make failures and telemetry attributable to a specific step.
tag管道中的每个处理器都应有一个(不仅是可能失败的处理器)。标签可将故障和遥测数据归因于特定步骤。
tagCEL-only opening processors (Agentless metadata and error-only documents)
仅CEL的起始处理器(无Agent元数据和仅错误文档)
For CEL-based integrations only, include these before the standard → handling when they apply:
messageevent.original- : drop Agentless metadata fields (
remove,organization,division) when all are strings, so they do not collide with ECS. Useteamand a conditionalignore_missing: true.if - : stop processing when the document is an error placeholder from the collector (
terminate).ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
Non-CEL integrations (logs, syslog, filebeat-style inputs) must not copy this block blindly — those fields and error shapes are specific to the CEL/Agentless path. See the skill: the orchestrator must only expect this block when the data stream uses CEL input.
create-integration仅对于基于CEL的集成,当适用时,在标准的→处理之前添加以下处理器:
messageevent.original- :当
remove、organization、division均为字符串时,删除无Agent元数据字段,避免与ECS冲突。使用team和条件ignore_missing: true。if - :当文档是收集器返回的错误占位符时(
terminate),停止处理。ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
非CEL集成(日志、syslog、filebeat风格的输入)不得盲目复制此代码块——这些字段和错误格式是CEL/无Agent路径特有的。请参阅技能:编排器仅在数据流使用CEL输入时才会预期此代码块。
create-integrationStandard opening: ECS, optional CEL block, JSE00001, then parse event.original
event.original标准起始流程:ECS、可选CEL块、JSE00001,然后解析event.original
event.originalAfter the optional CEL-only processors, the pipeline should follow this shape. All parsing (, , , etc.) runs on . Never overwrite or mutate in later processors — derive structured fields into other paths (for example , , ECS fields).
jsoncsvgrokevent.originalevent.originaljson_temp.*yaml
description: Parse <dataset> events.
processors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
# --- CEL input only (omit for log/syslog-only streams) ---
- remove:
field:
- organization
- division
- team
ignore_missing: true
if: ctx.organization instanceof String && ctx.division instanceof String && ctx.team instanceof String
tag: remove_agentless_tags
description: >-
Removes the fields added by Agentless as metadata,
as they can collide with ECS fields.
- terminate:
tag: data_collection_error
if: ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
description: error message set and no data to process.
# --- end CEL-only ---
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != null
# Parse (always read from event.original; do not modify event.original)
- json:
field: event.original
target_field: json
tag: parse_json
if: ctx.event?.original != null
# ... normalize, enrich, ECS categorization, cleanup ...
- append:
field: tags
value: preserve_original_event
allow_duplicates: false
if: ctx.error?.message != null
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: false在可选的仅CEL处理器之后,管道应遵循以下结构。所有解析(、、等)都基于****执行。切勿在后续处理器中覆盖或修改——将结构化字段派生到其他路径(例如、、ECS字段)。
jsoncsvgrokevent.originalevent.originaljson_temp.*yaml
description: Parse <dataset> events.
processors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
# --- CEL input only (omit for log/syslog-only streams) ---
- remove:
field:
- organization
- division
- team
ignore_missing: true
if: ctx.organization instanceof String && ctx.division instanceof String && ctx.team instanceof String
tag: remove_agentless_tags
description: >-
Removes the fields added by Agentless as metadata,
as they can collide with ECS fields.
- terminate:
tag: data_collection_error
if: ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
description: error message set and no data to process.
# --- end CEL-only ---
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != null
# Parse (always read from event.original; do not modify event.original)
- json:
field: event.original
target_field: json
tag: parse_json
if: ctx.event?.original != null
# ... normalize, enrich, ECS categorization, cleanup ...
- append:
field: tags
value: preserve_original_event
allow_duplicates: false
if: ctx.error?.message != null
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: falseSingle-path pattern (linear pipeline)
单路径模式(线性管道)
Use this pattern when one parser flow handles all events. Combine the standard opening (ECS version, optional CEL-only block, JSE00001 rename/remove, parse from without mutating it), middle processors with tags on every step, and the pipeline-level and conditional for shown above.
event.originalon_failureappendpreserve_original_eventExample middle section (illustrative):
yaml
- grok:
field: event.original
patterns:
- '^...$'
tag: parse_main
- date:
field: some.time
target_field: '@timestamp'
formats: [ISO8601]
tag: parse_timestamp
- convert:
field: http.response.status_code
type: long
ignore_missing: true
tag: convert_status
- user_agent:
field: user_agent.original
ignore_missing: true
tag: enrich_user_agent
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
tag: enrich_source_geo
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
tag: enrich_source_asn
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
tag: rename_source_asn
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
tag: rename_source_as_org
- set:
field: event.kind
tag: set_event_kind
value: event
- append:
field: event.category
tag: append_event_category_web
value: web
- remove:
field: temp
ignore_missing: true
tag: remove_temp当一个解析流处理所有事件时,使用此模式。结合标准起始流程(ECS版本、可选仅CEL块、JSE00001重命名/移除、基于解析且不修改它)、每个步骤都带有标签的中间处理器,以及上述的管道级和的条件。
event.originalon_failurepreserve_original_eventappend示例中间部分(仅供说明):
yaml
- grok:
field: event.original
patterns:
- '^...$'
tag: parse_main
- date:
field: some.time
target_field: '@timestamp'
formats: [ISO8601]
tag: parse_timestamp
- convert:
field: http.response.status_code
type: long
ignore_missing: true
tag: convert_status
- user_agent:
field: user_agent.original
ignore_missing: true
tag: enrich_user_agent
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
tag: enrich_source_geo
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
tag: enrich_source_asn
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
tag: rename_source_asn
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
tag: rename_source_as_org
- set:
field: event.kind
tag: set_event_kind
value: event
- append:
field: event.category
tag: append_event_category_web
value: web
- remove:
field: temp
ignore_missing: true
tag: remove_tempBranching pattern (router + sub-pipelines)
分支模式(路由器+子管道)
Use branching when event formats or object models diverge:
- format-based branching (for example JSON vs text)
- class/category-based branching (for example OCSF class/category routing)
- object-presence branching ()
ctx.ocsf.user != null
Pattern:
yaml
processors:
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_json" }}'
if: ctx.event?.original != null && ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_json
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_text" }}'
if: ctx.event?.original != null && !ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_textIn large integrations, keep as the router and put branch logic in files like:
default.ymlpipeline_object_<name>.ymlpipeline_category_<name>.yml
See for full patterns from .
references/branching-patterns.mdamazon_security_lake当事件格式或对象模型存在差异时使用分支:
- 基于格式的分支(例如JSON vs 文本)
- 基于类/分类的分支(例如OCSF类/分类路由)
- 基于对象存在的分支()
ctx.ocsf.user != null
模式:
yaml
processors:
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_json" }}'
if: ctx.event?.original != null && ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_json
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_text" }}'
if: ctx.event?.original != null && !ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_text在大型集成中,将作为路由器,并将分支逻辑放在以下文件中:
default.ymlpipeline_object_<name>.ymlpipeline_category_<name>.yml
有关完整模式,请参阅中的。
amazon_security_lakereferences/branching-patterns.mdSub-pipeline routing for multi-log-type integrations
多日志类型集成的子管道路由
When a data stream receives multiple distinct log types (for example a firewall that emits traffic, auth, and DNS logs in the same stream), do not implement all parsing in a single monolithic . Use as a thin router that detects the log type and delegates to a dedicated sub-pipeline per type.
default.ymldefault.yml当一个数据流接收多种不同类型的日志时(例如防火墙在同一流中发送流量、认证和DNS日志),不要在单个庞大的中实现所有解析逻辑。将作为轻量路由器,检测日志类型并将任务委托给每种类型对应的专用子管道。
default.ymldefault.ymlFile layout
文件结构
text
elasticsearch/ingest_pipeline/
default.yml # router only — detects log type, calls sub-pipelines
pipeline-<type>.yml # one file per log type (e.g. pipeline-traffic.yml)text
elasticsearch/ingest_pipeline/
default.yml # 仅作为路由器——检测日志类型,调用子管道
pipeline-<type>.yml # 每种日志类型对应一个文件(例如pipeline-traffic.yml)Router pattern in default.yml
default.ymldefault.yml
中的路由器模式
default.ymlUse the same , JSE00001 / pair for , and full pipeline-level as in the standard opening. The router only branches sub-pipelines; it does not parse payloads.
ecs.versionrenameremovemessageon_failureyaml
processors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
if: ctx.event?.original != null
- pipeline:
name: '{{ IngestPipeline "pipeline-traffic" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("TRAFFIC")'
tag: route_traffic
- pipeline:
name: '{{ IngestPipeline "pipeline-auth" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("AUTH")'
tag: route_auth
- pipeline:
name: '{{ IngestPipeline "pipeline-dns" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("DNS")'
tag: route_dns
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: false使用与标准起始流程相同的**、用于的JSE00001重命名/移除对,以及完整的管道级**。路由器仅负责分支到子管道;不解析负载。
ecs.versionmessageon_failureyaml
processors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
if: ctx.event?.original != null
- pipeline:
name: '{{ IngestPipeline "pipeline-traffic" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("TRAFFIC")'
tag: route_traffic
- pipeline:
name: '{{ IngestPipeline "pipeline-auth" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("AUTH")'
tag: route_auth
- pipeline:
name: '{{ IngestPipeline "pipeline-dns" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("DNS")'
tag: route_dns
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: falseRules
规则
- must contain only routing logic and
default.ymlhandling — no field parsing.on_failure - Each sub-pipeline handles parsing, ECS mapping, and categorization for its own log type.
- Each sub-pipeline must have its own block.
on_failure - Name sub-pipeline files where
pipeline-<type>.ymlmatches the log type identifier used in the routing condition.<type> - Each log type gets its own pipeline test fixture file following the naming convention .
test-<package>-<datastream>-<type>-sample.log
- 必须仅包含路由逻辑和
default.yml处理——不包含字段解析。on_failure - 每个子管道负责其对应日志类型的解析、ECS映射和分类。
- 每个子管道必须有自己的块。
on_failure - 子管道文件命名为,其中
pipeline-<type>.yml与路由条件中使用的日志类型标识符匹配。<type> - 每种日志类型都有自己的管道测试fixture文件,遵循命名约定。
test-<package>-<datastream>-<type>-sample.log
Processor ordering and performance
处理器排序与性能
- run cheap existence checks before expensive operations
- drop early if records are out of scope
- prefer over
dissectfor stable delimited formatsgrok - never use a processor when a built-in processor can do the job —
script,set,rename,remove,append,convert,dissect,grok,gsub,lowercase, anduppercaseare all faster than Painless and easier to review. See the cost tiers intrim→ Processor performance guide.references/processor-cookbook.md - use enrichment processors (,
geoip) only when neededuser_agent - always anchor patterns with
grokand^— without anchors the regex engine scans the entire input string looking for a partial match, which is slow and can produce incorrect results on noisy log lines$
- 在执行昂贵操作之前先运行低成本的存在性检查
- 尽早丢弃超出范围的记录
- 对于稳定的分隔格式,优先使用而非
dissectgrok - 当内置处理器可以完成任务时,切勿使用处理器——
script、set、rename、remove、append、convert、dissect、grok、gsub、lowercase和uppercase都比Painless更快且更易于审查。请参阅trim中的成本层级→处理器性能指南。references/processor-cookbook.md - 仅在需要时使用富化处理器(、
geoip)user_agent - 始终使用和
^锚定$模式——没有锚点的话,正则引擎会扫描整个输入字符串以寻找部分匹配,这会很慢且可能在嘈杂的日志行上产生错误结果grok
Mustache template syntax in processor values
处理器值中的Mustache模板语法
Ingest pipeline processors use Mustache templates to reference field values in , , and similar string parameters. Use triple braces with single quotes — never double braces or double quotes:
valuemessage{{{field}}}yaml
undefinedIngest pipeline处理器使用Mustache模板在、和类似字符串参数中引用字段值。使用三重大括号并搭配单引号——切勿使用双大括号或双引号:
valuemessage{{{field}}}yaml
undefinedCORRECT — triple braces, single quotes
CORRECT — triple braces, single quotes
- append: field: related.user value: '{{{user.target.email}}}' allow_duplicates: false if: ctx.user?.target?.email != null
- append: field: related.user value: '{{{user.target.email}}}' allow_duplicates: false if: ctx.user?.target?.email != null
WRONG — double braces HTML-escape the value; double quotes
WRONG — double braces HTML-escape the value; double quotes
- append: field: related.user value: "{{user.target.email}}" allow_duplicates: false if: ctx.user?.target?.email != null
Why: Mustache double braces `{{...}}` HTML-encode the value (e.g., `&` becomes `&`), which corrupts data in ingest pipelines. Triple braces `{{{...}}}` emit the raw value. Single quotes prevent YAML from interpreting braces.
**Exception:** `{{ IngestPipeline "..." }}` in `pipeline.name` is a Go template directive processed at build time, not a Mustache template — it correctly uses double braces.- append: field: related.user value: "{{user.target.email}}" allow_duplicates: false if: ctx.user?.target?.email != null
原因:Mustache双大括号`{{...}}`会对值进行HTML编码(例如`&`变为`&`),这会破坏ingest pipeline中的数据。三重大括号`{{{...}}}`会输出原始值。单引号可防止YAML解析大括号。
**例外情况:**`pipeline.name`中的`{{ IngestPipeline "..." }}`是构建时处理的Go模板指令,而非Mustache模板——它正确使用双大括号。Error handling essentials
错误处理要点
Use pipeline-level as the main error reporting mechanism.
on_failureRecommended baseline (order matters):
- append contextual first using
error.messagevariables (full template in the standard opening example)_ingest.on_failure_* - set (with a
event.kind: pipeline_erroron thetagprocessor)set - append to
preserve_original_eventwhen you need to retain the failed document for triagetags - give every processor a (not only processors that can fail)
tag
Use processor-level for local cleanup or fallback parsing, not as the primary global error message path.
on_failureSee for full examples and tradeoffs (, , processor-level ).
references/error-handling-patterns.mdignore_failurefailon_failure使用管道级作为主要的错误报告机制。
on_failure推荐的基线(顺序很重要):
- 首先使用变量追加上下文相关的
_ingest.on_failure_*(标准起始流程示例中有完整模板)error.message - 设置(在
event.kind: pipeline_error处理器上添加set)tag - 当需要保留失败文档以进行分类时,追加到
preserve_original_eventtags - 为每个处理器添加(不仅是可能失败的处理器)
tag
使用处理器级进行本地清理或备用解析,而非作为主要的全局错误消息路径。
on_failure有关完整示例和权衡(、、处理器级),请参阅。
ignore_failurefailon_failurereferences/error-handling-patterns.mdevent.original handling (JSE00001)
event.original处理(JSE00001)
The validator enforces that pipelines correctly handle the to rename. This check is known as JSE00001. New packages must comply; some legacy packages exclude it via .
elastic-package buildmessageevent.originalvalidation.ymlelastic-package buildmessageevent.originalvalidation.ymlRequired two-processor pattern
必需的双处理器模式
Every pipeline that consumes a field must include both processors (typically after and after any CEL-only / steps when applicable):
messageecs.versionremoveterminateyaml
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != nullStep 1 (): moves into , but only when is not already populated (idempotent when a prior pipeline or Logstash has already set it).
renamemessageevent.originalevent.originalStep 2 (): removes the redundant field when is present (after rename or from an upstream producer).
removemessageevent.original每个使用字段的管道必须包含这两个处理器(通常在之后,以及适用时在任何仅CEL的/步骤之后):
messageecs.versionremoveterminateyaml
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != null步骤1():将移动到,但仅当尚未填充时(当先前的管道或Logstash已设置它时,此操作是幂等的)。
renamemessageevent.originalevent.original步骤2():当存在时(重命名后或来自上游生产者),移除冗余的字段。
removeevent.originalmessageDo NOT add an event.original
removal processor at the end of the pipeline
event.original请勿在管道末尾添加event.original
移除处理器
event.originalSome existing integrations contain a processor that deletes at the end of the pipeline when is not in . This pattern is deprecated and must not be used in new pipelines. The removal of for storage optimization is now handled by a separate final pipeline outside the integration. Do not copy this pattern from reference integrations that still have it — it is legacy.
removeevent.originalpreserve_original_eventtagsevent.original一些现有集成包含一个处理器,当不在中时,会在管道末尾删除。此模式已弃用,不得在新管道中使用。现在,为优化存储而移除的操作由集成之外的单独最终管道处理。请勿从仍有此模式的参考集成中复制——这是遗留内容。
removepreserve_original_eventtagsevent.originalevent.originalReference
参考
The two-processor JSE00001 pattern (rename + remove of ) shown above is required and complete. Do not add any additional processors beyond those two.
messageevent.original上述的双处理器JSE00001模式(的重命名+移除)是必需且完整的。除了这两个处理器之外,请勿添加任何其他处理器。
messageevent.originalTimezone handling (tz_offset
)
tz_offset时区处理(tz_offset
)
tz_offsetFor data streams that include the manifest var (syslog streams where messages lack a timezone), set from early in the pipeline, before any date parsing:
tz_offsetevent.timezone_conf.tz_offsetyaml
- set:
field: event.timezone
tag: set_event_timezone
value: '{{{_conf.tz_offset}}}'
if: ctx._conf?.tz_offset != null && ctx._conf.tz_offset != ''This ensures date processors can apply the correct timezone when parsing timestamps that have no timezone component.
对于包含清单变量的数据流(消息中缺少时区的syslog流),在管道早期、任何日期解析之前,从设置:
tz_offset_conf.tz_offsetevent.timezoneyaml
- set:
field: event.timezone
tag: set_event_timezone
value: '{{{_conf.tz_offset}}}'
if: ctx._conf?.tz_offset != null && ctx._conf.tz_offset != ''这确保日期处理器在解析无时区组件的时间戳时可以应用正确的时区。
Syslog structured data (RFC 5424 SD-ELEMENT) parsing
Syslog结构化数据(RFC 5424 SD-ELEMENT)解析
For vendor payloads and RFC 5424 SD-ELEMENT blocks, three strategies are available: KV with (simplest, Strategy 1), grok + KV with regex splits (Strategy 2), and Painless for edge cases with embedded equals or mixed quoting (Strategy 3).
key=valuetrim_valueSYSLOG5424SDPrefer Strategy 1 or 2; use Painless only when KV edge cases demand it.
See → Syslog structured data strategies for full code examples, key settings, and reference implementations.
references/grok-recipes.md对于厂商负载和RFC 5424 SD-ELEMENT块,有三种策略可用:带的KV(最简单,策略1)、 grok + 带正则分割的KV(策略2),以及用于包含嵌入等号或混合引号边缘情况的Painless(策略3)。
key=valuetrim_valueSYSLOG5424SD优先选择策略1或2;仅当KV边缘情况需要时才使用Painless。
有关完整代码示例、关键设置和参考实现,请参阅→Syslog结构化数据策略。
references/grok-recipes.mdKeyword fields delivered as numbers
以数字形式传递的Keyword字段
Fields that carry identifiers, protocol codes, or other opaque values must be declared as in — even when the source data delivers them as numbers. Common examples:
keywordfields.yml- network protocol numbers ()
network.iana_number - port numbers used as identifiers
- error codes, result codes, status codes
- SNMP OIDs, event IDs, object class codes
Do not add a processor to stringify these values. Elasticsearch silently coerces numbers into strings at index time, so the pipeline can pass the raw numeric value through unchanged.
convertkeywordThe field declaration in :
fields.ymlyaml
- name: network.iana_number
type: keyword
description: IANA protocol number.Because the test runner compares raw value types against declared field types, it will flag (long) as a mismatch for . Declare the field in in the pipeline test config so the runner accepts the numeric representation without requiring the fixture to artificially stringify the value. See for the config syntax.
6keywordnumeric_keyword_fieldsintegration-testing/references/pipeline-testing.md携带标识符、协议代码或其他不透明值的字段必须在中声明为——即使源数据以数字形式传递它们。常见示例:
fields.ymlkeyword- 网络协议编号()
network.iana_number - 用作标识符的端口号
- 错误代码、结果代码、状态代码
- SNMP OID、事件ID、对象类代码
不要添加处理器将这些值转换为字符串。Elasticsearch会在索引时自动将数字转换为字符串,因此管道可以保持原始数值不变。
convertkeywordfields.ymlyaml
- name: network.iana_number
type: keyword
description: IANA protocol number.由于测试运行器会将原始值类型与声明的字段类型进行比较,它会将(long)标记为与不匹配。在管道测试配置的中声明该字段,这样运行器就可以接受数值表示,而无需fixture人为地将值转换为字符串。有关配置语法,请参阅。
6keywordnumeric_keyword_fieldsintegration-testing/references/pipeline-testing.mdVendor field naming
厂商字段命名
Preserve vendor field names exactly as they appear in the source. Do not rename, reformat, or normalize vendor-specific field names — the only permitted renaming is mapping a vendor field to an ECS field (e.g. renaming to ). When a vendor field has no ECS equivalent, keep it under a vendor-namespaced prefix (e.g. ) using the original name from the source.
src_ipsource.ipvendor.product.field_name完全保留源数据中出现的厂商字段名称。请勿重命名、重新格式化或标准化特定于厂商的字段名称——唯一允许的重命名是将厂商字段映射到ECS字段(例如将重命名为)。当厂商字段没有对应的ECS字段时,将其保留在厂商命名空间前缀下(例如),使用源数据中的原始名称。
src_ipsource.ipvendor.product.field_namerelated.ip population
related.ip填充
Every IP address present in the document must be appended to . This includes source, destination, client, server, host, and any other IP fields — whatever applies to the event type.
related.ipUse one processor per IP field, with so it is a no-op when the field is absent. Place these processors after all IP fields have been set (for example after , , and any ECS rename steps) and before the cleanup processors.
appendignore_missing: truegeoipconvertremoveyaml
- append:
field: related.ip
tag: append_source_ip_to_related
value: '{{{source.ip}}}'
allow_duplicates: false
if: ctx.source?.ip != null
- append:
field: related.ip
tag: append_destination_ip_to_related
value: '{{{destination.ip}}}'
allow_duplicates: false
if: ctx.destination?.ip != null
# repeat the same pattern for client.ip, server.ip, host.ip, and any other IP fields the pipeline setsRules:
- Use on every append to avoid repeated values.
allow_duplicates: false - Add an guard on every processor so it skips fields absent in the event.
if - Add one per IP field the pipeline actually writes — do not add processors for fields the pipeline never sets.
append
文档中存在的每个IP地址都必须追加到中。这包括源、目标、客户端、服务器、主机以及任何其他IP字段——无论事件类型适用哪些字段。
related.ip每个IP字段使用一个处理器,设置,这样当字段不存在时,该处理器不会执行任何操作。将这些处理器放在所有IP字段都已设置之后(例如在、和任何ECS重命名步骤之后),以及清理处理器之前。
appendignore_missing: truegeoipconvertremoveyaml
- append:
field: related.ip
tag: append_source_ip_to_related
value: '{{{source.ip}}}'
allow_duplicates: false
if: ctx.source?.ip != null
- append:
field: related.ip
tag: append_destination_ip_to_related
value: '{{{destination.ip}}}'
allow_duplicates: false
if: ctx.destination?.ip != null
# 对client.ip、server.ip、host.ip以及管道设置的任何其他IP字段重复相同的模式规则:
- 在每个append处理器上使用以避免重复值。
allow_duplicates: false - 在每个处理器上添加条件,以便在事件中缺少字段时跳过。
if - 仅为管道实际写入的IP字段添加append处理器——不要为管道从未设置的字段添加处理器。
Painless script best practices
Painless脚本最佳实践
Before writing any processor, you MUST check whether a built-in processor can do the same job. is the slowest general-purpose processor (Painless compilation + per-document execution). The following operations have dedicated processors that are cheaper and easier to review:
scriptscript| If you need to … | Use this processor, not |
|---|---|
| Copy, move, or rename a field | |
| Set a constant or derived value | |
| Add a value to a list | |
| Change a field's type | |
| Extract a substring from a delimited string | |
| Extract a substring with regex | |
| Replace characters in a string | |
| Normalize case | |
Only reach for when no combination of built-in processors can express the logic — for example, ECS categorization lookup tables with 5+ entries (Pattern A), complex conditional arithmetic, or edge-case string parsing that and genuinely cannot handle.
scriptdissectgrokCase-insensitive comparisons — use when casing is unpredictable
equalsIgnoreCase()Syslog and vendor devices are often inconsistent about casing, so Painless scripts comparing vendor-specific free-text fields should use rather than . However, apply this judgement contextually, not blanket:
equalsIgnoreCase()==- Use when the vendor field value may vary in casing between devices, firmware versions, or log sources (e.g. action fields like
equalsIgnoreCase(), severity strings, free-text status fields).allow/Allow/ALLOW - Use when the API or spec defines a fixed lowercase enum and the values are always delivered as-specified (e.g. ECS categorization fields, API response fields documented as lowercase-only enums). Adding
==to fixed-enum fields adds noise without value.equalsIgnoreCase()
painless
// Correct for unpredictable vendor casing
if (ctx.vendor?.action?.equalsIgnoreCase('allow')) { ... }
// Correct for a fixed lowercase API enum — == is appropriate here
if (ctx.json?.event_type == 'login') { ... }
// Incorrect for unpredictable casing — breaks on "Allow", "ALLOW"
if (ctx.vendor?.action == 'allow') { ... }Access directly in script bodies — no null-safe operators
ctxIn processor blocks, access fields directly. Use explicit null checks instead of the null-safe operator.
scriptsourcectx?.painless
// Correct — direct access with explicit null check
if (ctx.source != null && ctx.source.ip != null) { ... }
// Incorrect — null-safe operator in a script body
if (ctx.source?.ip != null) { ... }Note: null-safe is acceptable in processor conditions (YAML), which are a different Painless execution context:
?.ifyaml
- append:
field: related.ip
value: '{{{source.ip}}}'
if: ctx.source?.ip != nullOther rules
- Every processor must have a
scriptand atag.description - Keep scripts short and scoped — move complex logic into helper variables inside the script, not across multiple script processors.
- Do not use when built-in processors suffice — see the mandatory checklist table at the top of this section.
script
在编写任何处理器之前,你必须检查是否有内置处理器可以完成相同的工作。是最慢的通用处理器(Painless编译+逐文档执行)。以下操作都有专用处理器,它们成本更低且更易于审查:
scriptscript| 如果你需要… | 使用此处理器,而非 |
|---|---|
| 复制、移动或重命名字段 | |
| 设置常量或派生值 | |
| 向列表中添加值 | |
| 更改字段类型 | |
| 从分隔字符串中提取子字符串 | |
| 使用正则表达式提取子字符串 | |
| 替换字符串中的字符 | |
| 规范化大小写 | |
仅当没有内置处理器的组合可以表达逻辑时,才使用——例如,包含5个以上条目的ECS分类查找表(模式A)、复杂的条件算术,或者和确实无法处理的边缘情况字符串解析。
scriptdissectgrok不区分大小写的比较——当大小写不可预测时使用
equalsIgnoreCase()Syslog和厂商设备的大小写通常不一致,因此比较厂商特定自由文本字段的Painless脚本应使用而非。但是,请根据上下文判断,不要一概而论:
equalsIgnoreCase()==- 使用:当厂商字段值可能因设备、固件版本或日志源而大小写不同时(例如
equalsIgnoreCase()等操作字段、严重性字符串、自由文本状态字段)。allow/Allow/ALLOW - 使用:当API或规范定义了固定的小写枚举,且值始终按指定方式传递时(例如ECS分类字段、文档中说明为仅小写枚举的API响应字段)。对固定枚举字段添加
==会增加不必要的复杂性。equalsIgnoreCase()
painless
// 对于不可预测的厂商大小写,正确的写法
if (ctx.vendor?.action?.equalsIgnoreCase('allow')) { ... }
// 对于固定小写API枚举,正确的写法——==是合适的
if (ctx.json?.event_type == 'login') { ... }
// 对于不可预测的大小写,错误的写法——在"Allow"、"ALLOW"时会失效
if (ctx.vendor?.action == 'allow') { ... }在脚本主体中直接访问——不要使用空安全运算符
ctx在处理器的块中,直接访问字段。使用显式的空检查而非空安全运算符。
scriptsourcectx?.painless
// 正确——直接访问并显式空检查
if (ctx.source != null && ctx.source.ip != null) { ... }
// 错误——在脚本主体中使用空安全运算符
if (ctx.source?.ip != null) { ... }注意:在处理器的条件(YAML)中,空安全是可接受的,这是不同的Painless执行上下文:
if?.yaml
- append:
field: related.ip
value: '{{{source.ip}}}'
if: ctx.source?.ip != null其他规则
- 每个处理器必须有一个
script和一个tag。description - 保持脚本简短且范围明确——将复杂逻辑移至脚本内部的辅助变量中,而非跨多个脚本处理器。
- 当内置处理器足够时,请勿使用——请参阅本节顶部的强制检查表。
script
ECS categorization mapping
ECS分类映射
When mapping source event types or actions to , , , and , use the patterns in → ECS categorization mapping patterns:
event.categoryevent.typeevent.outcomeevent.actionreferences/processor-cookbook.md- Pattern A (script with lookup table): recommended for 5+ mappings. Mapping data in
paramsenables Painless compilation caching and keeps the script body generic.params - Pattern B (processors with conditionals): for fewer than 5 mappings where a script is overkill.
set - Pattern C (sub-pipeline): for 100+ mappings, extract the categorization into a dedicated sub-pipeline file.
Do NOT use bulk processors (2 per event type = 50+ processors for 25 types) or inline Painless / chains without (defeats compilation caching). These are explicit anti-patterns — see the cookbook for details.
appendifelseparams将源事件类型或操作映射到、、和时,请使用→ECS分类映射模式中的模式:
event.categoryevent.typeevent.outcomeevent.actionreferences/processor-cookbook.md- 模式A(带查找表的脚本):推荐用于5个以上映射。将映射数据放在
params中可以启用Painless编译缓存,并保持脚本主体通用。params - 模式B(带条件的处理器):用于少于5个映射的场景,此时使用脚本过于繁琐。
set - 模式C(子管道):用于100个以上映射的场景,将分类提取到专用的子管道文件中。
请勿使用批量处理器(每种事件类型2个=25种类型需要50个以上处理器)或不带的内联Painless/链(会破坏编译缓存)。这些是明确的反模式——请参阅手册了解详细信息。
appendparamsifelseGrok best practices
Grok最佳实践
- prefer when structure is fixed
dissect - use simpler grok patterns where possible
- always anchor grok patterns with and
^:$yaml# Correct — anchored, fails fast on non-matching lines patterns: - '^%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}$' # Incorrect — unanchored, scans the whole string for a partial match patterns: - '%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}' - avoid unnecessary backtracking-heavy custom regex
- add a to every grok (and every other) processor
tag
For grok syntax (three expression forms, inline regex, type coercion, ), syslog header splitting recipes, and common mistakes, see .
pattern_definitionsreferences/grok-recipes.md- 当结构固定时,优先使用
dissect - 尽可能使用更简单的grok模式
- 始终使用和
^锚定grok模式:$yaml# 正确——锚定,在不匹配的行上快速失败 patterns: - '^%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}$' # 错误——未锚定,扫描整个字符串寻找部分匹配 patterns: - '%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}' - 避免不必要的、回溯严重的自定义正则表达式
- 为每个grok(以及其他所有)处理器添加
tag
有关grok语法(三种表达式形式、内联正则、类型转换、)、syslog头拆分方案和常见错误,请参阅。
pattern_definitionsreferences/grok-recipes.mdProhibited patterns
禁止使用的模式
These patterns exist in many legacy integrations but must not be used in new or updated pipelines. Do not copy them from reference integrations.
这些模式存在于许多遗留集成中,但不得在新管道或更新后的管道中使用。请勿从参考集成中复制它们。
Never set event.ingested
event.ingested切勿设置event.ingested
event.ingestedThe field is managed by Elasticsearch outside the integration pipeline. Do not add a processor for in any integration pipeline. This includes patterns like:
event.ingestedsetevent.ingestedyaml
undefinedevent.ingestedevent.ingestedsetyaml
undefinedPROHIBITED — do not use
禁止使用——请勿使用
- set: field: event.ingested value: '{{{_ingest.timestamp}}}'
The pipeline **should** set `@timestamp` from the original event's timestamp. When the source data contains multiple timestamps, map them as follows:
- **`@timestamp`**: the primary event timestamp parsed from the source data. This is required.
- **`event.created`**: when the event was first created or recorded by the source system (if different from `@timestamp`).
- **`event.start`**: when an activity or period began (e.g., session start, connection start).
- **`event.end`**: when an activity or period ended (e.g., session end, connection close).
If a source timestamp does not match the semantics of `event.created`, `event.start`, or `event.end`, map it to a custom field under the vendor namespace with `type: date` in `fields.yml` and use a `date` processor with the appropriate `target_field`.- set: field: event.ingested value: '{{{_ingest.timestamp}}}'
管道**应**从原始事件的时间戳设置`@timestamp`。当源数据包含多个时间戳时,按以下方式映射:
- **`@timestamp`**:从源数据解析的主事件时间戳。这是必需的。
- **`event.created`**:源系统首次创建或记录事件的时间(如果与`@timestamp`不同)。
- **`event.start`**:活动或周期开始的时间(例如会话开始、连接开始)。
- **`event.end`**:活动或周期结束的时间(例如会话结束、连接关闭)。
如果源时间戳与`event.created`、`event.start`或`event.end`的语义不匹配,请将其映射到厂商命名空间下的自定义字段,并在`fields.yml`中设置`type: date`,然后使用带有适当`target_field`的`date`处理器。Never use preserve_duplicate_custom_fields
preserve_duplicate_custom_fields切勿使用preserve_duplicate_custom_fields
preserve_duplicate_custom_fieldsThe tag pattern — where source fields are copied to ECS fields using with and the originals are conditionally retained — is a legacy anti-pattern. Do not use it in any new or updated pipeline. Do not add a manifest variable, tag, or conditional logic.
preserve_duplicate_custom_fieldssetcopy_frompreserve_duplicate_custom_fieldsInstead, follow these field mapping rules:
- When a source field maps to an ECS field, use to move it directly. The source field is removed and no duplicate exists.
rename - When a type conversion is needed (e.g., string to date, string to long), use the appropriate processor (,
date,convertwithset) to populate the ECS target field, thencopy_fromthe source field in the cleanup section at the end of the pipeline.remove - Never design a pipeline that needs to preserve both the original vendor field and the ECS copy. The ECS field is the canonical location.
If you encounter this pattern in a reference integration, ignore it — it is legacy.
preserve_duplicate_custom_fieldscopy_fromsetpreserve_duplicate_custom_fields相反,请遵循以下字段映射规则:
- 当源字段映射到ECS字段时,使用直接移动它。源字段会被移除,不会存在重复。
rename - 当需要类型转换时(例如字符串转日期、字符串转长整型),使用适当的处理器(、
date、带convert的copy_from)填充ECS目标字段,然后在管道末尾的清理部分set源字段。remove - 切勿设计需要同时保留原始厂商字段和ECS副本的管道。ECS字段是规范的存储位置。
如果你在参考集成中遇到此模式,请忽略它——这是遗留内容。
Never add an event.original
removal processor at the end
event.original请勿在管道末尾添加event.original
移除处理器
event.originalAs documented in the JSE00001 section above: do not add a processor for at the end of the pipeline. This is handled by a separate final pipeline.
removeevent.original如JSE00001部分所述:请勿在管道末尾添加的处理器。此操作由单独的最终管道处理。
event.originalremoveReferences
参考资料
- — processor selection, parsing/normalization/enrichment examples, ECS categorization mapping patterns (Pattern A/B/C + anti-patterns)
references/processor-cookbook.md references/branching-patterns.mdreferences/error-handling-patterns.md- — grok syntax, type coercion, syslog header recipes, common mistakes, pattern library link
references/grok-recipes.md - — always-embedded subagent operating manual: scope boundaries, skill-load sequence, input data paths (CEL-first vs Direct), 9-step pipeline build workflow, "review generated output, never hand-edit expected JSON", reporting contract
references/builder-subagent-guidance.md
- ——处理器选择、解析/标准化/富化示例、ECS分类映射模式(模式A/B/C + 反模式)
references/processor-cookbook.md references/branching-patterns.mdreferences/error-handling-patterns.md- ——grok语法、类型转换、syslog头方案、常见错误、模式库链接
references/grok-recipes.md - ——始终嵌入的子代理操作手册:范围边界、技能加载顺序、输入数据路径(CEL优先 vs 直接)、9步管道构建工作流、“审查生成的输出,切勿手动编辑预期JSON”、报告契约
references/builder-subagent-guidance.md