LinkByNet

LinkByNet's engineering blog

Elasticsearch tuning : a simple use case exploring the Elastic Stack, NiFi and Bitcoin

By Stéphane KATTOOR, 07 Feb 2017

This tutorial is going to explore a few ways to improve Elasticsearch performance.

To have a working example – and to make things more interesting – we’re going to graph Bitcoin’s exchange rate on Bitstamp.

End Result

To make this happen, we’re going to use :

  • Bitstamp, which offers a public API which lets us query the current exchange rate
  • Apache NiFi, which is an ETL of sorts : it can pull data from a source, transform it, and inject it somewhere else, defining complex pipelines where needed. We’ll use it to pull the Bitcoin exchange rate from Bitstamp
  • Elasticsearch, which is a powerful and popular indexing software. It can be used as a NoSQL document store and we’re going to use it to store the Bitcoin exchange rate as a time series
  • Timelion, which is a Kibana plugin designed to make sense of, and analyze, time series

Configuring Elasticsearch

Installing Elasticsearch / Kibana

You will find all the necessary instructions to setup your Elastic Stack directly from Elastic :

System / hardware

Elasticsearch is a resource hungry beast! You will need :

  • (Very) Fast disks : Preferably SSDs, preferably in RAID-0 for maximum I/O performance (or you can have multiple disks mounted on your system and let Elasticsearch decide where to write). This is assuming that you have an Elasticsearch cluster with data redundancy.
  • Lots of RAM : Ample amounts of RAM will be required to run complex queries. It is however recommended to allocate half of the system memory to Elasticsearch, and leave the remaining half to the system for disk caching
  • Enough CPU power : Although Elasticsearch is probably less greedy for CPU than it is for Disk speed or RAM quantity, indexing and searching can mean going through a lot of data, and you will obviously get better response times with faster CPU.

Correctly sizing an Elasticsearch cluster is generally a difficult task and it requires experience to do it right.

Identify your functional requirements:

  • how much data you are going to ingest,
  • what type of queries you are going to run,
  • what query response times you want to achieve,
  • etc.

Then test for performance during the early phases of your project: This will allow you to have metrics to correctly size your production platform.

Start with a small platform and make it grow to address your functional requirements.

The beauty of Elasticsearch is that you can grow your cluster very easily by adding more nodes into it, if you designed your indices to allow that. More on this later.

Protip!

When growing the cluster, prefer horizontal scaling (adding more nodes) to vertical scaling (bigger nodes). This will not only share the load but also allow for better fault tolerance at the same time.

Elasticsearch settings

A few Elasticsearch settings are critical for good performance :

  • The size of the JVM Heap, which should be half the size of the total amount of RAM (leaving the other half for OS level disk caching). This is set in the java.options file.
  • The number of open file handles should be as high as possible (usually a system setting : for Linux you will find it in the /etc/security/limits.conf configuration file)
  • You also should make sure that Elasticsearch data will not be swapped out of RAM and into disk space to prevent unexpected and sporadic drops in performance.

There are quite a few of them, for which Elastic provides ample documentation.

Protip!

You will need to tune the OS as well as the JVM so as to squeeze the best performance out of Elasticsearch.

Keeping data tidy

Elasticsearch will require more and more resources (Disk, RAM, CPU) as the volume of documents grows. Too much data will make Elasticsearch choke on queries. Therefore it is important to get rid of outdated or obsolete data as soon as possible.

Deleting documents in an Elasticsearch index is an expensive operation, especially if there are lots of them. On the other hand, dropping an entire index is cheap.

Therefore, whenever possible, create time-based indices. For example, each index will hold data for a specific day, week, month, year etc. When you want to get rid of old documents, you can easily drop entire indices.

As an alternative to deleting indices, you can also close them. This will retain the data on disk, but will free up most of the CPU/RAM resources and also leave you the option to re-open them if you need them later on.

Finally, there is always data which is stored in indices you can’t delete or close because you still use them regularly. Often the older indices cease to be modified : This is typical of time-based data, where you will add documents to the index of the day and the older indices are basically read-only). Those indices can be optimized to reduce their resource requirements.

To summarize :

  • Delete indices when data is obsolete
  • Close indices where data might be infrequently accessed
  • Optimize indices which are no longer updated

There is a tool which will help you do all this in bulk : curator.

As an example, here is how you would purge data older than 3 months in our case :

Setting the curator YAML configuration files

First you need to create an actions.yml YAML configuration file to describe exactly which criteria you want the data purge to be based on :

---
# Remember, leave a key empty if there is no value.  None will be a string,
# not a Python "NoneType"
#
# Also remember that all examples have 'disable_action' set to True.  If you
# want to use this action as a template, be sure to set this to False after
# copying it.
actions:
  1:
    action: delete_indices
    description: >-
      Delete indices older than 90 days (based on index name), for bitstamp-
      prefixed indices. Ignore the error if the filter does not result in an
      actionable list of indices (ignore_empty_list) and exit cleanly.
    options:
      ignore_empty_list: True
      timeout_override:
      continue_if_exception: False
      disable_action: True
    filters:
    - filtertype: pattern
      kind: prefix
      value: logstash-
      exclude:
    - filtertype: age
      source: name
      direction: older
      timestring: '%Y-%m-%d'
      unit: days
      unit_count: 90
      exclude:
Info

This is directly inspired by the example provided in the official documentation.

Refer to this if you want more information on other possible actions (such as closing indices, or reorganizing them through a forceMerge action).

You will also need to create a config.yml file which will tell curator how to connect to the Elasticsearch cluster. This is best described in the official documentation so I will not cover it here.

Running the curator command

Once those configuration files are created, running curator is as simple as :

curator --config config.yml actions.yml

This is typically a command you will execute by means of a cron job.

You might be interested in the --dry-run option : curator will tell you what it would do if it were to run “for real” but not actually perform the actions. Handy when debugging.

Protip!

This protip is twofold :

  • Create “temporal indices” whenever possible to ease their time-based management / tuning
  • Use curator to remove all obsolete data, close not-often-used indices, optimize read-only indices

This will lead to a smaller data footprint, and an easier-on-resources Elasticsearch.

Index configuration and Data modeling

Shards and replicas

Indices are divided in shards, and shards have replicas. Both shards and replicas will be distributed across the different nodes of an Elasticsearch cluster :

  • The number of shards conditions the capacity of the cluster to distribute write operations across different nodes. Too few means that after a certain point, adding nodes will not improve writing performance. Too many will waste RAM.
  • The number of replicas conditions the capacity of the cluster to distribute the read operations, as well as the fault tolerance. Too many will consume disk space. Too few will impact read performance and fault tolerance.

The settings of the indices are critical to Elasticsearch performance.

Protip!

When it comes to shards and replicas, it’s a bit of a Goldilocks principle in action : You should have not too many, not to few, but just enough.

Choose the numbers of shards and replicas of the indices wisely because you cannot change them afterwards (you would have to reindex the data).

Field mappings

Indexation, queries and aggregations can be costly in CPU and RAM resources but you can use your knowledge of the data you will store to help Elasticsearch do a better job.

For example :

  • if you know that you won’t be searching in specific fields of your documents, you can instruct Elasticsearch not to index them
  • if you won’t be sorting or aggregating, leave fielddata disabled
  • If you know that a numeric field will be within a certain range, you can choose the smallest numeric type

Putting it all together

Both the shard / replica settings and the field mappings can be set in an index template.

Here is the one we will use for our little Bitcoin indexing experiment, which we will save in the bitstamp-template.json file :

{
        "template" : "bitstamp-*",
        "settings": {
                "number_of_shards" : 5,
                "number_of_replicas" : 0
        },
        "mappings" : {
                "_default_" : {
                        "properties" : {
                                "timestamp" : {
                                        "type" : "date",
                                        "format" : "epoch_second"
                                }
                        }
                },
                "bitstampquotes": {
                        "properties" : {
                                "last" : {
                                        "type" : "scaled_float",
                                        "scaling_factor" : 100,
                                        "index" : false,
                                        "coerce" : true
                                },
                                "volume" : {
                                        "type" : "float",
                                        "index" : false,
                                        "coerce" : true
                                }
                        }
                }
        }
}

In this somewhat trivial template, we do the following :

  • We instruct Elasticsearch to apply it to any index whose name starts with “bitstamp-“
  • We fix the number of shards (which is actually the default, and to be honest we could have set it to 1 for this pet project) and the number of replicas to zero because we are working with a single node “cluster”.
  • We indicate that the timestamp field is a date field, and will be provided as the number of seconds since January, 1st, 1970.
  • The last field is a currency with 2 decimal places, so we set it as a scaled_float with a factor of 100 (which is like saying that the last rate will be internally stored by Elasticsearch as the number of cents and showed after dividing it by 100)
  • We are never going to search for an indexed document by last rate or volume so we instruct Elasticsearch not to bother indexing it

You can put this index template in place with the following command :

curl -XPUT http://localhost:9200/_template/bitstamp_template_1?pretty -d @bitstamp-template.json
Protip!

Use your knowledge of the data you will store in Elasticsearch to guide it into being more efficient and consume less resources.

Setting up NiFi

NiFi works by fetching data from its source, converting it into a so called “Flowfile” and then letting the Flowfile go through Processors. Each processor will in turn extract or modify data from the Flowfile and then pass it on to the next one. This will go on until the Flowfile has been through the entire workflow you defined.

Installation

Fetching the NiFi distribution, unpacking and starting it is best described in the official documentation so we will not cover this here.

Assembling the Processors

We are going to use 4 processors for this little project :

  • GetHTTP will connect to the Bitstamp API endpoint to get the current exchange rate and volume
  • JoltJSONTransform will be used to retain only the information we are interested in, and to get rid of the rest
  • EvaluateJSONPath will let us fetch the timestamp from within the Flowfile and save it as a Flowfile attribute for later use
  • PutElasticsearch5 will finally index the Flowfile into Elasticsearch using the timestamp Flowfile attribute to specify which index to put it into

Let’s get into the details of each Processor.

Info

You will have to connect to the NiFi webinterface (default URL when installed locally).

Defining an SSL Context Service

We are going to connect to an HTTPs website in order to collect the Bitcoin trading information, and as such we need to setup a StandardSSLContextService. This will provide root certificates for example.

You can do this in NiFi as seen in the following animation :

NiFi's SSL configuration

Tab Property Value
Settings Name StandardSSLContextService
Properties Truststore Filename /opt/jre/lib/security/cacerts
Properties Truststore Password changeit

These settings are those of the default Java Runtime Environment Truststore. You might have to adjust both the path and the password to match your installation.

Positioning the necessary Processors

The following animation will demonstrate how to position the Processors and link them together to compose our workflow :

Processors

Once this is done, it will be time to configure each Processor.

GetHTTP Processor

The GetHTTP Processor is used to download a single webpage. In our case, it will be used to fetch the Bitcoin trading information as a JSON formatted file from Bitstamp API.

If you query it in a web browser, this is what you will see : GetHTTP

We are basically going to have NiFi do the same through the GetHTTP processor.

As you might expect, we need to configure a few settings :

Tab Property Value
Properties URL https://www.bitstamp.net/api/v2/ticker/btcusd/
Properties Filename btcusd.json
Properties SSL Context Service StandardSSLContextService
Scheduling Run schedule 60 sec

Here’s the demo illustrating how to configure all this :

GetHTTP

JoltTransformJSON Processor

Next, we’re going to configure the JoltTransformJSON processor.

Jolt is to JSON what XSLT is to XML. It lets you describe in JSON how to transform an input JSON document into an output JSON document.

We’re going to use this mechanism to keep only what we are interested in from the JSON document we get from the Bitstamp API.

Here are the settings to configure :

Tab Property Value
Properties Jolt Transformation DSL Shift
Properties Jolt Specification See below

The Jolt specification is the JSON document where we basically say that we will map the timestamp, volume and last fields from the input into the same fields in the output. This will implicitly discard the other fields.

Here is what it looks like :

{
  "timestamp" : "timestamp",
  "last" : "last",
  "volume" : "volume"
}

JOLT

Protip!

Elasticsearch indices can grow really big really quickly. This means they will require more resources to store (disk space) and process (CPU, RAM).

Always index as little as possible to satisfy your business needs.

EvaluateJsonPath Processor

There will be an Elasticsearch index for each day, so we need to extract the timestamp from the JSON data returned by Bitstamp and set it as a Flowfile attribute so that we can later use it to define the index we want to insert the data into.

We’re going to use the EvaluateJsonPath Processor to do this, by configuring it as shown in the table below :

Tab Property Value
Properties Destination Flowfile-attribute
Properties timestamp $.timestamp

EvalJSON

PutElasticsearch5 Processor

Finally, the Flowfile processing will end with it being indexed into Elasticsearch, which will be performed by the PutElasticsearch5 Processor.

Here are the settings to configure :

Tab Property Value
Properties Cluster name Elasticsearch
Properties ElasticSearch Hosts 127.0.0.1:9300
Properties Identifier Attribute uuid
Properties Index bitstamp-${timestamp:multiply(1000):format(“yyyy-MM-dd”)}
Properties Type bitstampquotes
Properties Batch Size 100

Worth noting :

  • We set the Index to have a name containing the day of the data, so as to be able to drop entire indices when purging old data
  • The Type name has to match whatever you had configured in your index template so as to be mapped to the proper parameters
  • The Batch Size parameter is left to its default value for the purpose of our toy project, however it is one to test for and tune appropriately. It controls how many documents NiFi will pool before sending a bulk indexing request to Elasticsearch

PutES5

Protip!

Use Elasticsearch’s bulk API whenever possible : It will have Elasticsearch process multiple documents at once and improve significantly performance by reducing the query overhead per document.

Checking the flow behavior

Once this is all in place and configured, we can the Processors one by one and check that it behaves as intended :

Check

Timelion

Timelion is a Kibana plugin which allows you to analyze time series.

It let you plot and compare different datasources on different timeframes (ex : website traffic this week compared to last week), compute statistical functions like derivatives and sliding window averages, and many other things.

The following video does a better job than I could at showing what you can expect from Timelion, so I suggest watching it to get a better idea of what you can do :

In our case, we simply want to graph the Bitcoin exchange rate and the trading volume over time :

.es(index="bitstamp*", timefield="timestamp", metric="avg:last").fit(carry).movingaverage(window=10)
.es(index="bitstamp*",timefield="timestamp",metric="avg:volume").fit(carry).yaxis(2)

Let’s go through the first line :

  1. .es() : We use the datasource Elasticsearch (.es) and pull the data from the indices matching bitstamp*
  2. The field holding the time component of the time series is timestamp
  3. We want to plot the average of the last field (this is the last exchange rate)
  4. fit(carry) : if there is no data available when required for a computation, then we’ll just use the last one available
  5. movingaverage(window=10) : finally we’ll plot the moving average over the last 10 values

The plotting of the volume is very similar.

This will result in the following graph :

Final result

Conclusion

Elasticsearch has the well-deserved reputation of being remarkably fast at ingesting data as well as answering queries of all sorts and shapes.

It is indeed possible to achieve a truly massive scale of several petabytes of data, on clusters of hundreds or thousands of nodes, ….

To be able to do this best, you will need a thorough knowledge of both your business and the technical details of the platform (OS, JVM, Elasticsearch, …).

This definitely has some challenges and complexities, especially at a large scale, but it is required to make sure your Elasticsearch will remain fast and you will get the best bang for your buck.

References

Software stack

Elasticsearch reference material

NiFi reference material

LINKBYNET is an IT watchmaker that can assist you with talent in your digital transformation. We build best-of-breed architectures and orchestrate your applications with precision, reliability and quality. Innovation, Best of Breed Services and Enhanced Cloud, we invent and implement a whole set of services to meet all your needs.