Welcome to the second part of the Workshop. As usual, to keep each article as compact as possible, I will shortcut the queries to snippets. If you want to see the complete code, please consult my GitHub page for this workshop. With that said, let’s jump right in.
Pipelines are the Swiss army knife for ingesting data into Elasticsearch. With pipelines, you can transform data before indexing them. Pipelines are not as powerful as Logstash, but they are integrated into Elasticsearch and usable from the start.
Pipelines can be used in various APIs:
- Index API
- Reindex API
- Update_by_query API
- as setting of an index or an index-template
- as a setting for beats
Pipelines are using “processors” for data transformations like update values, add or delete a field, compute values, split data into arrays, extract values, and much more. And if no processor fits your need, then there is the script processor.
The raw data
The following text represents our raw input. The data is represented in 5 fields:
"company_name": "Elastic EV",
"address": "800 West El Camino Real, Suite 350",
"city": "Mountain View, Ca 94040",
"ticker_symbol": "ESTC",
"market_cap": "8B"
Create a pipeline with a set processor
We will work heavily with the “city” field. To keep the original field intact, we will copy “city” to “city_array”. The first processor of our new pipeline is a “set”-processor:
PUT _ingest/pipeline/split-city-string-to-array
{ "description": "Pipeline does various changes on incoming company data",
"processors": [
{
"set": {
"field": "city_array",
"copy_from": "city"
} } ] }
Let’s create a document with the Index API and use the pipeline “split-city-string-to-array”:
PUT companies/_doc/1?pipeline=split-city-string-to-array
{ "company_name": "Elastic EV",
"address": "800 West El Camino Real, Suite 350",
"city": "Mountain View, Ca 94040",
"ticker_symbol": "ESTC",
"market_cap": "8B"
}
Let’s check the result:
GET companies/_doc/1
...
"address" : "800 West El Camino Real, Suite 350",
"ticker_symbol" : "ESTC",
"market_cap" : "8B",
"city" : "Mountain View, Ca 94040",
"company_name" : "Elastic EV",
"city_array" : "Mountain View, Ca 94040"
The split processor
Let’s add another processor that splits the text in “city_array” to an array. Since we don’t want to copy the field “city_array” a second time, we extend the set processor with the overwrite parameter:
"set": {
"field": "city_array",
"copy_from": "city",
"override": false
}
},
"split": {
"field": "city_array",
"separator": ","
}
Run a pipeline by _update_by_query
Let’s run a update_by_query and use our pipeline:
POST companies/_update_by_query?pipeline=split-city-string-to-array
{ "query": { "match_all": {} } }
Let’s check how this has worked out. The string “Mountain View, CA 94040” should now be an array with 2 elements:
"_source" : {
"city" : [
"Mountain View",
" Ca 94040"
] }
The foreach and the gsub processor
Looks almost good. As you have noticed, there is an ugly space at the beginning of the second element. Let’s add another processor to remove leading spaces. And because we are dealing with an array, we need to iterate through with a foreach processor:
"foreach": {
"field": "city_array",
"processor": {
"gsub": {
"field": "_ingest._value",
"pattern": "^ ",
"replacement": ""
} }
Run a pipeline by the _simulate API
It makes sense, to test a pipeline by simulating it. Therefore the documents are not getting messed up when we play around. Use the simulate API for the next steps, it will save you time until we get the final version smoothly running:
POST /_ingest/pipeline/split-city-string-to-array/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"company_name": "Elastic EV",
"address": "800 West El Camino Real, Suite 350",
"city": "Mountain View, Ca 94040",
"ticker_symbol": "ESTC",
"market_cap": "8B",
"city_array" : [
"Mountain View",
" Ca 94040"
] } } ] }
We are getting an error. The field “city_array” is no anymore a text field and therefore the split processor ends with an error:
"type" : "illegal_argument_exception",
"reason" : "field [city_array] of type [java.util.ArrayList] cannot be cast to [java.lang.String]"
Setting conditions in processors
You can define for most processors an “if-condition”. In this case, we check if the field “city_array” is a text field.
"split": {
"if": "ctx.city_array instanceof String",
"field": "city_array",
"separator": ","
Let’s run the pipeline again and check the results:
"city" : "Mountain View, Ca 94040",
"city_array" : [
"Mountain View",
"Ca 94040"
]
Handling pipeline failures
We don’t need to check the field type. In this case, we are getting a failure, because we did some processing already and can tell the pipeline, that it is OK to ignore a failure of this processor and continue with the rest of the pipeline. We do this by setting the “ignore_failure” parameter:
"split": {
"ignore_failure": true,
"field": "city_array",
"separator": ","
There is an additional parameter, called “on_failure”, which allows calling separate processors if the processor fails. A good example can be found on the official documentation
The script processor
Painless scripting will have its topic in the coming workshop. For now, I like to give the elements in the array “city_array” their fields. To enhance readability, scripting can span over multiple lines if you use three double quotes at the beginning and the end:
"script": {
"tag": "script",
"ignore_failure": true,
"source": """
ctx['city_name'] = ctx['city_array'].0;
def split_statezip=ctx['city_array'].1.splitOnToken(' ');
ctx['state'] = split_statezip[0];
ctx['zip'] = split_statezip[1];
"""
}
The first element of the array gets straightforward stored in the field “city_name”. For the second element, there are other methods in painless, like using dissect or regex. For now, we are fine with a split and store the results in the fields “state” and “zip”:
"city_name" : "Mountain View",
"state" : "Ca",
"zip" : "94040",
The uppercase processor
Looks almost good. But the state must be in uppercase letters. Let’s use the uppercase processor:
"uppercase": {
"field": "state"
Let’s run the pipeline again and check our fields:
"zip" : "94040",
"city_name" : "Mountain View",
"state" : "CA",
The convert processor
The zip code is a string. I would like to have the field as type long, let’s convert the type:
"convert": {
"field": "zip",
"type": "long"
}
Let’s see if the string got converted to long:
"zip" : 94040,
The remove processor
Since the “city_array” field was only used to create the “city_name”, “state” and “zip” fields, we can remove it:
"remove": {
"field": ["city_array", "city"]
}
The rename processor
“city_name” could be shorter. Let’s rename that field to “city”:
"rename": {
"field": "city_name",
"target_field": "city"
}
The full query
PUT _ingest/pipeline/split-city-string-to-array
{
"processors": [
{
"set": {
"field": "city_array",
"copy_from": "city",
"override": false
}
},
{
"split": {
"tag": "split",
"ignore_failure": true,
"field": "city_array",
"separator": ","
}
},
{
"foreach": {
"tag": "foreach",
"field": "city_array",
"processor": {
"gsub": {
"field": "_ingest._value",
"pattern": "^ ",
"replacement": ""
}
}
}
},
{
"script": {
"tag": "script",
"ignore_failure": true,
"source": """
ctx['city_name'] = ctx['city_array'].0;
def split_statezip=ctx['city_array'].1.splitOnToken(' ');
ctx['state'] = split_statezip[0];
ctx['zip'] = split_statezip[1];
"""
}
},
{
"uppercase": {
"field": "state"
}
},
{
"convert": {
"field": "zip",
"type": "long"
}
},
{
"remove": {
"field": ["city_array", "city"]
}
},
{
"rename": {
"field": "city_name",
"target_field": "city"
}
}
]
}
The result
So far our pipeline has passed our simulation. Let’s run a update_by_query and see if the original data can be transformed to the final document:
POST companies/_update_by_query?pipeline=split-city-string-to-array
GET companies/_doc/1
...
"zip" : 94040,
"address" : "800 West El Camino Real, Suite 350",
"ticker_symbol" : "ESTC",
"city" : "Mountain View",
"market_cap" : "8B",
"company_name" : "Elastic EV",
"state" : "CA"
Setting a pipeline as default for an index
Instead of defining the pipeline, every time we ingest data, the pipeline can be set as a default per index:
PUT companies/_settings
{
"index.default_pipeline": "split-city-string-to-array"
}
Conclusion
If you made it here: Congratulations! You should now be able to set up pipelines and transform your data before they are getting indexed.
Of course, there are much more processors. Check the official documentation: elastic.co/guide/elasticsearch/processor
If Google brought you here, you might check also the start of the series, or the whole series.
And as always: the full queries and code is available on the GitHub page for this workshop