Loading...
Loading...
Use when designing or modifying Elasticsearch ingest pipelines, including single-path parsing, branching logic, sub-pipelines, enrichment processors, and robust on_failure handling.
npx skill4agent add elastic/integration-skills ingest-pipelineselastic/integrationselasticsearch/ingest_pipeline/default.ymlgrokdissectjsonkvdateconvertpipelineon_failureecs-field-mappingselastic-package-cliintegration-testingreferences/pipeline-testing.mddata_stream/<stream>/elasticsearch/ingest_pipeline/default.ymldescriptionprocessorson_failuredefault.ymlprocessors9.3.0 - set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'renamesetcopy_fromrenametagmessageevent.originalremoveorganizationdivisionteamignore_missing: trueifterminatectx.error?.message != null && ctx.message == null && ctx.event?.original == nullcreate-integrationevent.originaljsoncsvgrokevent.originalevent.originaljson_temp.*description: Parse <dataset> events.
processors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
# --- CEL input only (omit for log/syslog-only streams) ---
- remove:
field:
- organization
- division
- team
ignore_missing: true
if: ctx.organization instanceof String && ctx.division instanceof String && ctx.team instanceof String
tag: remove_agentless_tags
description: >-
Removes the fields added by Agentless as metadata,
as they can collide with ECS fields.
- terminate:
tag: data_collection_error
if: ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
description: error message set and no data to process.
# --- end CEL-only ---
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != null
# Parse (always read from event.original; do not modify event.original)
- json:
field: event.original
target_field: json
tag: parse_json
if: ctx.event?.original != null
# ... normalize, enrich, ECS categorization, cleanup ...
- append:
field: tags
value: preserve_original_event
allow_duplicates: false
if: ctx.error?.message != null
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: falseevent.originalon_failureappendpreserve_original_event - grok:
field: event.original
patterns:
- '^...$'
tag: parse_main
- date:
field: some.time
target_field: '@timestamp'
formats: [ISO8601]
tag: parse_timestamp
- convert:
field: http.response.status_code
type: long
ignore_missing: true
tag: convert_status
- user_agent:
field: user_agent.original
ignore_missing: true
tag: enrich_user_agent
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
tag: enrich_source_geo
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
tag: enrich_source_asn
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
tag: rename_source_asn
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
tag: rename_source_as_org
- set:
field: event.kind
tag: set_event_kind
value: event
- append:
field: event.category
tag: append_event_category_web
value: web
- remove:
field: temp
ignore_missing: true
tag: remove_tempctx.ocsf.user != nullprocessors:
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_json" }}'
if: ctx.event?.original != null && ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_json
- pipeline:
name: '{{ IngestPipeline "pipeline_branch_text" }}'
if: ctx.event?.original != null && !ctx.event.original.startsWith('{')
ignore_missing_pipeline: true
tag: route_textdefault.ymlpipeline_object_<name>.ymlpipeline_category_<name>.ymlreferences/branching-patterns.mdamazon_security_lakedefault.ymldefault.ymlelasticsearch/ingest_pipeline/
default.yml # router only — detects log type, calls sub-pipelines
pipeline-<type>.yml # one file per log type (e.g. pipeline-traffic.yml)default.ymlecs.versionrenameremovemessageon_failureprocessors:
- set:
field: ecs.version
tag: set_ecs_version
value: '9.3.0'
- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
if: ctx.event?.original != null
- pipeline:
name: '{{ IngestPipeline "pipeline-traffic" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("TRAFFIC")'
tag: route_traffic
- pipeline:
name: '{{ IngestPipeline "pipeline-auth" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("AUTH")'
tag: route_auth
- pipeline:
name: '{{ IngestPipeline "pipeline-dns" }}'
if: 'ctx.event?.original != null && ctx.event.original.contains("DNS")'
tag: route_dns
on_failure:
- append:
field: error.message
value: >-
Processor '{{{ _ingest.on_failure_processor_type }}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{ _ingest.on_failure_processor_tag }}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{ _ingest.on_failure_message }}}'
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: tags
value: preserve_original_event
allow_duplicates: falsedefault.ymlon_failureon_failurepipeline-<type>.yml<type>test-<package>-<datastream>-<type>-sample.logdissectgrokscriptsetrenameremoveappendconvertdissectgrokgsublowercaseuppercasetrimreferences/processor-cookbook.mdgeoipuser_agentgrok^$valuemessage{{{field}}}# CORRECT — triple braces, single quotes
- append:
field: related.user
value: '{{{user.target.email}}}'
allow_duplicates: false
if: ctx.user?.target?.email != null
# WRONG — double braces HTML-escape the value; double quotes
- append:
field: related.user
value: "{{user.target.email}}"
allow_duplicates: false
if: ctx.user?.target?.email != null{{...}}&&{{{...}}}{{ IngestPipeline "..." }}pipeline.nameon_failureerror.message_ingest.on_failure_*event.kind: pipeline_errortagsetpreserve_original_eventtagstagon_failurereferences/error-handling-patterns.mdignore_failurefailon_failureelastic-package buildmessageevent.originalvalidation.ymlmessageecs.versionremoveterminate- rename:
field: message
tag: rename_message_to_event_original
target_field: event.original
ignore_missing: true
description: Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.
if: ctx.event?.original == null
- remove:
field: message
tag: remove_message
ignore_missing: true
description: The `message` field is no longer required if the document has an `event.original` field.
if: ctx.event?.original != nullrenamemessageevent.originalevent.originalremovemessageevent.originalevent.originalremoveevent.originalpreserve_original_eventtagsevent.originalmessageevent.originaltz_offsettz_offsetevent.timezone_conf.tz_offset- set:
field: event.timezone
tag: set_event_timezone
value: '{{{_conf.tz_offset}}}'
if: ctx._conf?.tz_offset != null && ctx._conf.tz_offset != ''key=valuetrim_valueSYSLOG5424SDreferences/grok-recipes.mdkeywordfields.ymlnetwork.iana_numberconvertkeywordfields.yml- name: network.iana_number
type: keyword
description: IANA protocol number.6keywordnumeric_keyword_fieldsintegration-testing/references/pipeline-testing.mdsrc_ipsource.ipvendor.product.field_namerelated.ipappendignore_missing: truegeoipconvertremove - append:
field: related.ip
tag: append_source_ip_to_related
value: '{{{source.ip}}}'
allow_duplicates: false
if: ctx.source?.ip != null
- append:
field: related.ip
tag: append_destination_ip_to_related
value: '{{{destination.ip}}}'
allow_duplicates: false
if: ctx.destination?.ip != null
# repeat the same pattern for client.ip, server.ip, host.ip, and any other IP fields the pipeline setsallow_duplicates: falseifappendscriptscript| If you need to … | Use this processor, not |
|---|---|
| Copy, move, or rename a field | |
| Set a constant or derived value | |
| Add a value to a list | |
| Change a field's type | |
| Extract a substring from a delimited string | |
| Extract a substring with regex | |
| Replace characters in a string | |
| Normalize case | |
scriptdissectgrokequalsIgnoreCase()equalsIgnoreCase()==equalsIgnoreCase()allow/Allow/ALLOW==equalsIgnoreCase()// Correct for unpredictable vendor casing
if (ctx.vendor?.action?.equalsIgnoreCase('allow')) { ... }
// Correct for a fixed lowercase API enum — == is appropriate here
if (ctx.json?.event_type == 'login') { ... }
// Incorrect for unpredictable casing — breaks on "Allow", "ALLOW"
if (ctx.vendor?.action == 'allow') { ... }ctxscriptsourcectx?.// Correct — direct access with explicit null check
if (ctx.source != null && ctx.source.ip != null) { ... }
// Incorrect — null-safe operator in a script body
if (ctx.source?.ip != null) { ... }?.if- append:
field: related.ip
value: '{{{source.ip}}}'
if: ctx.source?.ip != nullscripttagdescriptionscriptevent.categoryevent.typeevent.outcomeevent.actionreferences/processor-cookbook.mdparamsparamssetappendifelseparamsdissect^$# Correct — anchored, fails fast on non-matching lines
patterns:
- '^%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}$'
# Incorrect — unanchored, scans the whole string for a partial match
patterns:
- '%{IPORHOST:source.ip} %{USER:user.name} %{DATA:message}'tagpattern_definitionsreferences/grok-recipes.mdevent.ingestedevent.ingestedsetevent.ingested# PROHIBITED — do not use
- set:
field: event.ingested
value: '{{{_ingest.timestamp}}}'@timestamp@timestampevent.created@timestampevent.startevent.endevent.createdevent.startevent.endtype: datefields.ymldatetarget_fieldpreserve_duplicate_custom_fieldspreserve_duplicate_custom_fieldssetcopy_frompreserve_duplicate_custom_fieldsrenamedateconvertsetcopy_fromremoveevent.originalremoveevent.originalreferences/processor-cookbook.mdreferences/branching-patterns.mdreferences/error-handling-patterns.mdreferences/grok-recipes.mdreferences/builder-subagent-guidance.md