Photo by Viktor Forgacs on Unsplash

Welcome to the first part of the series of Elasticsearch Workshops. To keep each article as compact as possible, I will shortcut the queries to snippets. If you want to see all the code that is used in this workshop, please consult my GitHub page for this workshop. With that said, let’s jump right in.

Elasticsearch gives you with the enrich-processor the ability, to add already existing data to your incoming documents. It “enriches” your new documents with data that is already stored in your indices (called source indices) and uses a “match-field” as a key to identify which documents from the source will contribute data to your incoming document. Elastic reads therefore your source indices and creates an enrichment index once the policy is executed.

But there is a caveat: the source indices should be as static as possible. You will see later, that the enrichment index does not have a steady relationship to his source index and if the content of the source index changes (new documents are added or documents get updated) this change does not apply automatically to the enrichment index. It needs another execution of the enrichment policy and with that, a newly created enrichment process. So don’t expect the behavior of an RDBMS and keep in mind, that those enrichment indices will use storage.

Create documents that will be later enriched

Let’s start and create a source index with some general data about specific companies. This data will be static in nature, changes will be rare, therefore it’s perfect for a source index:

POST _bulk
{ "index" : { "_index" : "companies", "_id" : "1" } }
{ "company_name": "Elastic EV", "address": "800 West El Camino Real, Suite 350", "city": "Mountain View, CA 94040","ticker_symbol": "ESTC", "market_cap": "8B"}
{ "create" : { "_index" : "companies", "_id" : "2" } }
{ "company_name": "Mongo DB, Inc","address": "1633 Broadway, 38th Floor","city": "New York, NY 10019","ticker_symbol": "MDB","market_cap": "23B"}
{ "create" : { "_index" : "companies", "_id" : "3" } }
{ "company_name": "Splunk Inc","address": "270 Brannan Street","city": "San Francisco, CA 94107", "ticker_symbol": "SPLK","market_cap": "18B"}

Let’s check the companies with the search endpoint:

"company_name" : "Elastic EV",
"address" : "800 West El Camino Real, Suite 350",
"city" : "Mountain View, CA 94040",
"ticker_symbol" : "ESTC",
"market_cap" : "8B"

Note, that there is a field “ticker_symbol”. This field will be the match-field and serves later as a key to match the documents in the source index with the incoming documents.

Let’s create another index “stock” with the structure of the future incoming documents. We use the index to show how existing documents can be enriched. As you can see, these documents have a field called “ticker”. This will match with the company documents which have the same value in their field “ticker_symbol”. Let’s add some documents with a ticker and their last traded price.

POST _bulk
{ "index" : { "_index" : "stocks", "_id" : "1" } }
{ "ticker": "ESTC", "last_trade": 82.5 }
{ "create" : { "_index" : "stocks", "_id" : "2" } }
{ "ticker": "MDB", "last_trade": 365 }

Add an enrichment policy

Our enrichment policy looks like this: in case the value of the match-field “ticker” of the incoming document matches with the field “ticker_symbol” of existing documents in our source index “companies”, please enrich the incoming document with the values for the company name, the address, city and the market-cap of the matching document in the source index.

PUT /_enrich/policy/add_company_data_policy
  "match": {
    "indices": "companies",
    "match_field": "ticker_symbol",
    "enrich_fields": [
      "company_name", "address", "city", "market_cap" ] 
} }

By executing this policy, the enrichment index will be created:

PUT /_enrich/policy/add_company_data_policy/_execute

Let’s check that enrichment index:

GET .enrich-add_company_data_policy

"provided_name" : ".enrich-add_company_data_policy-1643393128460",

Keep an eye on that name, this will be handy for comparison later

Add a pipeline that uses the enrichment policy

Create a pipeline that uses the “enrich”-processor which uses enrichment-policy and matches the value stored in the field “ticker” with the “ticker_symbol” of our existing documents. Store the additional data in the field “company”:

PUT _ingest/pipeline/enrich_stock_data
   "processors": [
      "set": {
        "field": "enriched",
        "value": 1 
      "enrich": {
        "policy_name": "add_company_data_policy",
        "field": "ticker",
        "target_field": "company" 
} } ] }

Enrich existing documents

The existing documents will now be reindexed to a new index called “full_stock_data”:

POST _reindex/
  "source": {
    "index": "stocks" },
  "dest": {
    "index": "full_stock_data",
    "pipeline": "enrich_stock_data" 
} }

The new enriched documents look now like this:

"ticker" : "ESTC",
"last_trade" : 82.5,
"enriched" : 1,
"company" : {
   "address" : "800 West El Camino Real, Suite 350",
   "ticker_symbol" : "ESTC",
   "market_cap" : "8B",
   "city" : "Mountain View, CA 94040",
   "company_name" : "Elastic EV"

Enrich incoming data

Now for incoming data, we use the pipeline again, this time with the PUT request and direct into our new index full_stock_data

PUT /full_stock_data/_doc/3?pipeline=enrich_stock_data
  "ticker": "SPLK",
  "last_trade": 113 

Let’s check if that has worked out and take a look at document 3:

"ticker" : "SPLK",
"last_trade" : 113,
"enriched" : 1,
"company" : {
  "address" : "270 Brannan Street",
  "ticker_symbol" : "SPLK",
  "market_cap" : "18B",
  "city" : "San Francisco, CA 94107",
  "company_name" : "Splunk Inc"

Good. Let’s make it a bit easier and add the pipeline as default-pipeline to “full_stock_data”:

PUT full_stock_data/_settings
{ "index.default_pipeline": "enrich_stock_data" }

Let’s add another company. Remember: our source index “companies” should be as static as possible. Soon you will see why. Here is another Big-Data company:

POST companies/_doc/4
  "company_name": "Datadog, Inc.",
  "address": "620 8th Avenue, 45th Floor",
  "city": "New York, NY 10018",
  "ticker_symbol": "DDOG",
  "market_cap": "40B" 

Let’s ingest another document with the latest trade for Datadog:

PUT full_stock_data/_doc/4
{ "ticker": "DDOG", "last_trade": 113 }

Let’s check document 4 and see if it worked as expected:

"found" : true,
"_source" : {
  "ticker" : "DDOG",
  "last_trade" : 113

That has not worked well. What happened? Remember: the enrichment index is created, when the enrichment policy is executed. The enrichment index reflects the source index at the execution time. Datadog was at this time not in the source index, therefore the incoming document will not match with any document in the source index and will not be enriched.

Fix documents that could not be enriched by the last run

To create a new enrichment index, we need to execute the policy again:

PUT /_enrich/policy/add_company_data_policy/_execute

Let’s check if a new enrichment index is created:

GET .enrich-add_company_data_policy

"provided_name" : ".enrich-add_company_data_policy-1643393302515"

Let’s fix the already indexed document for Datadog with a update_by_query:

POST full_stock_data/_update_by_query
  "query": {
    "bool": {
      "must_not": [
        { "exists": {
            "field": "company" 
} } ] } } }

Let’s check if we fixed the missing company field in document 4:

"ticker" : "DDOG",
"last_trade" : 113,
"enriched" : 1,
"company" : {
  "address" : "620 8th Avenue, 45th Floor",
  "ticker_symbol" : "DDOG",
  "market_cap" : "40B",
  "city" : "New York, NY 10018",
  "company_name" : "Datadog, Inc."


If you made it here: Congratulations! You should now be able to enrich incoming documents with additional data.

The enrichment processor needs to be handled with care. If your source index is a static dataset that will be updated in a nightly run or less, it can be very helpful. But keep in mind, that executing the policy will create each time a new enrichment index, that will use storage and memory:

GET _cat/indices/.enrich-add_company_data_policy*,companies,full_stock_data?s=i&v&h=idx,storeSize

idx                                           storeSize
.enrich-add_company_data_policy-1643402838136       4kb
.enrich-add_company_data_policy-1643445756291       4kb
companies                                        13.6kb
full_stock_data                                  28.5kb

Of course, there is much more to the topic, please check out the official documentation:

If Google brought you here, you might check also the start of the series, or the whole series.

And as always: the full queries and code is available on the GitHub page for this workshop

Schreibe einen Kommentar