Elasticsearch How to leverage ingest pipelines to transform data transparently in Elasticsearch

By Opster Expert Team - Kofi

Updated: Jan 28, 2024

| 4 min read

What ingest pipeline are used for

Do you have some adjustments you’d like to make to your data, but would like to use a method that is more lightweight than Logstash or some other data parsing tool? Ingest pipelines may just be what you’re looking for. 

With ingest pipelines you can manipulate your data to fit your needs without much overhead. Ingest pipelines sit within the Elasticsearch node (the ingest node, if you’ve defined one), and will perform a set of alterations on your data that you define. These alterations are called “processors” and each one executes a specific task for you. A set of these processors is called an ingest pipeline and when defined, can be used to manipulate data upon ingestion.

Let’s take the following structure of data as an example:

{
  "id": 1,
  "first_name": "John",
  "last_name": "Smith",
  "email": "jsmith@elastic.com",
  "ip_address": "100.1.193.2",
  "activities": "Soccer, Cooking, Eating"
}, 
{
  "id": 2,
  "first_name": "Trevor",
  "last_name": "Noah",
  "email": "tnoah@funny.tv",
  "ip_address": "122.100.4.22",
  "activities": "Joking, Stand-Up, Sleeping"
}

You can use ingest pipelines to alter the data above in the following ways:

  • Rename fields:
    • I.e. changing “first_name” to “firstName”
  • Remove fields:
    • I.e. remove the field `email`
  • Split fields to turn a value into an array using a separator rather than a string:
    • I.e.turn `activities` from `“Soccer, Cooking, Eating”` into [ “Soccer”,”Cooking”, “Eating”]
  • Do a GeoIP lookup on a field
  • Run a script for the most flexibility (with painless syntax):
    • I.e. Adding together two fields and dividing by another, or encode sensitive data.
  • Convert fields:
    • I.e. Change a field’s type from a string to an int
  • Enrich documents by doing a lookup to append additional information to each event:
    • I.e. More information below.

If you wanted to import data that is in its more raw form:

2021-05-11T20:14:25.003Z mytest.example.com There seems to be a problem

2021-05-11T20:14:25.003Z,mytest.example.com,There seems to be a problem

You can also use ingest pipelines to:

  • Parse out fields with grok or dissect:
    • I.e. Store “2021-05-11…” in a field called “date”; “mytest.example.com” in a field called “origin”; and “There seems to be a problem” in “raw_message”
  • Parse out a csv into fields:
    • I.e. in the second sample, use a comma, a separator and name the first value “date”; the second “origin”; the third “raw_message”

You can find an exhaustive list of all available processors in this guide.

How to create ingest pipelines

Ingesting documents is done in an Elasticsearch node that is given an “ingest” role (if you haven’t adjusted your node to have a certain role, then it’s able to ingest by default).

You can create the ingest pipelines and then define the pipeline you’d like the data to run through:

  • Your bulk POST to Elasticsearch, or 
  • Through use of an index template to define a pipeline that should be used for all indices that match a certain pattern.
  • And through several other methods that we won’t cover in this guide but also work, via: Logstash, the _update_by_query endpoint, and the _reindex endpoint.

Now let’s get our hands dirty and look at how to actually create an ingest pipeline and how to test if it’s working. If you have Kibana open, head over to Dev Tools under the Management tab. (You can also use the new Ingest Node Pipeline UI in Kibana for a more guided approach to creating pipelines by going to `Stack Management` > `Ingest Node Pipelines`.)

How to test a pipeline

To test a pipeline before creating it, you can use the `_simulate` endpoint:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description" : "Test pipeline",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": ["%{NOTSPACE:date} %{NOTSPACE:origin} %{GREEDYDATA:raw_message}"]
        }
      },
      {
        "rename": {
          "field": "Origin",
          "target_field": "OriginSite"
        }
      },
      {
        "date" : {
          "field" : "Date",
          "target_field" : "@timestamp",
          "formats" : ["ISO8601"]
        }
      },
      {
        "remove": {
          "field": "message"
        }
      }
    ]
  },
    "docs": [
    {
      "_source": {
        "message": "2021-05-11T20:14:25.003Z mytest.example.com There seems to be a problem"
      }
    },
    {
      "_source": {
        "message": "2021-02-12T10:24:35.000Z mytest2.example.com There seems to be another problem"
      }
    }
  ]
}

You can add different processors one by one to see if the result that you’re expecting is returned using the sample documents you provide under the `docs` array. Once you confirm that the pipeline is acting as desired, you can create the pipeline named `test`:

PUT _ingest/pipeline/test
{
  "description": "Test pipeline",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{NOTSPACE:Date} %{NOTSPACE:Origin} %{GREEDYDATA:raw_message}"
        ]
      }
    },
    {
      "rename": {
        "field": "Origin",
        "target_field": "OriginSite"
      }
    },
    {
      "date": {
        "field": "Date",
        "target_field": "@timestamp",
        "formats": [
          "ISO8601"
        ]
      }
    },
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

Don’t worry if you make a mistake – you can always resend the PUT command and it will update the pipeline. 

You can also delete it altogether with:

DELETE _ingest/pipeline/test

How to use the enrich processor

The enrich processor for Elasticsearch came out in version 7.5.0 due to an increasing demand to be able to do joins/lookups on a dataset. With the enrich processor, you can import an index, and then use that index to do a static lookup on incoming data to append any additional fields.

For example, if in your dataset you are importing different foods and you’d like to know what color they are, you can import an index that will allow you to “join” on the `fruit` field and import associated fields.

If your incoming data source looks like:

{
  "_id": 1,
  "fruit_type": "watermelon",
  "cost": 20,
  "inventory": 1200
}, 
{
  "_id": 2,
  "fruit_type": "kiwi",
  "cost": 5,
  "inventory": 3000
}

And you’d like to add the color of each of these obtained from a separate index, you can follow the steps below.

First, import the lookup data:

POST _bulk
{ "index" : { "_index" : "fruit_colors", "_id" : "1" } }
{ "color" : "red","fruit" : "watermelon"  }
{ "index" : { "_index" : "fruit_colors", "_id" : "2" } }
{ "color" : "green","fruit" : "kiwi"  }
{ "index" : { "_index" : "fruit_colors", "_id" : "3" } }
{ "color" : "blue","fruit" : "blueberry" }

Then create the enrich policy and execute it to build the enrichment index: 

PUT /_enrich/policy/color_lookup
{"match":{"indices":"fruit_colors","match_field":"fruit","enrich_fields":["color"]}}

PUT /_enrich/policy/color_lookup/_execute

Now you can leverage  the `color_lookup` enrich policy via the enrich processor in any pipeline.

PUT _ingest/pipeline/test
{
  "description": "Test pipeline",
  "processors": [
   {
      "enrich": {
        "ignore_missing": true, \\ for any documents that don’t have the lookup field (fruit)
        "policy_name": "color_lookup",
        "field": "fruit_type",
        "target_field": "additional_info"
      }
    }
  ]
}

In this example we used one enrich field, but you can add as many fields as you want.

Keep in mind that if you’d like to update the lookup index, you’ll need to update the index with the new documents, and rerun the `_execute` command to update the lookup. However, this updated lookup will **only be done on future data, all previous data will still have the previous values for the lookup.**

These enrich pipeline processors can be of tremendous use for anyone looking to use a separate data source to add meaningful information to ingested data. You can add several of these processors or keep it simple with one.

Ingest pipelines are great lightweight ways to clean up and enhance your Elasticsearch data to fit exactly your needs.

How helpful was this guide?

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?