Cloud Pak for Data Group

Store and Graph Streaming Data with the Elasticsearch Toolkit 

Fri August 07, 2020 02:39 PM

Elasticsearch is a distributed RESTful search engine that provides near real-time (NRT) search and provides various sets of APIs (RESTful, Java, etc.) for storing and accessing documents.

With the new Elasticsearch toolkit, you can effortlessly store your Streams data in Elasticsearch.

This article will guide you through how to use the ElasticsearchIndex operator to store your data and visualize it with Grafana, a popular, open-source, data visualization tool.


The toolkit has been tested with ElasticSearch 5.2.

To start storing your Streams data in Elasticsearch, you’ll need:

  1. IBM Streams 4.1 or later— If you don’t have this installed, you can try it by downloading the Quick Start Edition.
  2. The streamsx.elasticsearch toolkit — Clone it from this repository.

Additionally, if you want to graph your stored data, you’ll need a visualization tool. This article will cover how to graph data using Grafana:

  1. Grafana — Install and start Grafana here.

Setup Instructions

  1. Clone and build the streamsx.elasticsearch toolkit by running the following commands:git clone cd streamsx.elasticsearch/ ant all
  2. Add the toolkit as a dependency to your Streams application.
    Note: If you don’t have a Streams application to test with, there is an ECG Simulator sample in the toolkit’s samples folder that you may reference.

Using the Toolkit

Now that the toolkit has been added as a dependency to your application, simply connect the ElasticsearchIndex operator to your data source operator. Elasticsearch should automatically detect and store the correct type for your data. Here is a full list of types Elasticsearch supports.

Setting up the Server

If you have custom server settings, you can configure them through the following parameters:

  • hostName : rstring — Defines a Elasticsearch hostname to connect to. Default: "localhost".
  • hostPort : int32 — Defines the Elasticsearch hostport to connect to. Default: 9300.
  • clusterName: rstring — Defines the Elasticsearch cluster to connect to. Default: "elasticsearch".

Note: The default Elasticsearch install is configured to use the default parameter values above.

Defining the Index/Type

I’m going to assume that you have a basic understanding of what Elasticsearch is and how it’s structured. If not, read more about it here.

Essentially — To index documents (store data), an index and type are required to be defined.

The ElasticsearchIndex operator allows you to define these through the following parameters:

  • indexName : rstring — Defines a static index name to use.
  • indexNameAttribute : Attribute — Defines the attribute containing the index names to use.
  • typeName : rstring — Defines a static type name to use.
  • typeNameAttribute : Attribute — Defines the attribute containing the type names to use.

You only need to choose 1 of the above parameters for defining the index, and 1 for defining the type. The *Attribute parameters allow you to define a different index and type for each incoming tuple (document).

For example, look at the following SPL application:

   * The ECGSimulator simulates ECG Lead II values and outputs them, along
     * with a time-stamp.
stream <float64 ecg_value, int64 ecg_timestamp&gt ECGData = ECGSimulator()
{ param ... ; }

* The ElasticsearchIndex stores the ECGData tuples in the local host's
* Elasticsearch server. They can be found in the index, "ecg_index", and
* type, "ecg_type".
() as ElasticsearchIndex_0 = ElasticsearchIndex(ECGData) {
      indexName : "ecg_index" ;
           typeName : "ecg_type" ;
     storeTimestamps : true ;
       timestampValueAttribute : ecg_timestamp

The SPL application above contains a static index, "ecg_index" and static type, "ecg_type". Each incoming tuple will have all its attributes stored in a single document, excluding the ones used to feed the indexNameAttribute, typeNameAttribute, and timestampValueAttribute (look below for more details). Additionally, each document will have a timestamp (provided by the ecg_timestamp attribute) attached to it.

Configuring the IDs

Each document that is indexed is identified with a unique ID. By default, this is auto-generated ID. However, users have the option to define their own custom IDs. This is useful for users that intend to reindex existing documents (indexing a document with an already stored ID will replace it and increment its version).

The ElasticsearchIndex allows you to define a custom ID for each document through the following parameters:

  • idName : rstring — Defines a static ID name to use.
  • idNameAttribute : Attribute — Defines the attribute containing the ID names to use.

Storing Timestamps

Each document that is indexed can have a timestamp attached to it. These are useful for a variety of reasons. For example, they can be used in visualization tools such as Grafana and Kibana to create time-series charts/graphs.

The ElasticsearchIndex allows you to enable and configure the timestamp through the following parameters:

  • storeTimestamps : boolean — Enables storing timestamps.
  • timestampName : rstring — Defines a static timestamp name to use. Default: “timestamp”.
  • timestampValueAttribute : Attribute — Defines the attribute containing the timestamp values to use, in epoch time. By default, it will store the current system time when the document is indexed.

Once storeTimestamps is enabled, you may choose to give the timestamp a custom name using the timestampName attribute. You may also feed it custom timestamp values using the timestampValueAttribute — for example, the ecg_timestamp attribute in the example SPL application above. If you don’t feed it timestamp values, the operator will auto-generate ones for you (just make sure that storeTimestamps is set to true).

There are other parameters that the ElasticsearchIndex operator provides such as bulkSize, reconnectionPolicyCount, and sizeMetricsEnabled that you can find more details about in the documentation.

View saved data

If you want to confirm that data is being sent correctly from your Streams application to the database, use the following curl command:

curl -XGET "localhost:9200/{index}/{type}/_search?q=*&pretty"

Simply replace {index} and {type} with the ones you defined in the operator’s parameters. It should display a sample of indexed data.

For example, after running the ECGSimulator sample, you could run:

curl -XGET "localhost:9200/ecg_index/ecg_type/_search?q=*&pretty"

You should get output like this:

{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 150467, "max_score" : 1.0, "hits" : [ { "_index" : "ecg_index", "_type" : "ecg_type", "_id" : "AV2KbK0iGIssmSL8l00V", "_score" : 1.0, "_source" : { "ecg_value" : -0.075, "timestamp" : "2017-07-28T14:19:24.320-0400" } }, ....

Note: The following screenshots are referencing the ECG sample application in the samples folder that simulates ECG Lead II data and outputs it to Elasticsearch.

Graphing the Data

Now that there is data stored in Elasticsearch, it’s time to visualize it using Grafana. Your data should be graphed in real-time so don’t worry about running your application to its entirety before opening Grafana.


Add a Data Source

The first step to visualizing the data is to add the index as a data source to Grafana. You can do so through the following steps:

    1. Select Data Source in the top-left menu — This should display your existing data sources.
    2. Click on the Add data source button — This should open the configuration.

    3. Change the Type to Elasticsearch and fill in the Name, Url, Index name, Time field name (if applicable), and Version.  Here’s an example of the ECG sample’s configuration:
    4. Click Add.

Creating a Dashboard

The next step is to create a dashboard to hold our graphs. You can do so through the following steps:

  1. Select DashboardsNew from the top-left menu.
  2. Select Graph to add a graph template, and then select the panel title → Edit.

  3. Select your data source from the Panel data source options and click the Add query button. — This is what you should see now:
  4. Go ahead and select your desired Metric and the names of stored attributes should automatically be populated for you to select from. There are several other configuration options that you can control through the tabs under the graph.

Once you are finished configuring the graph, close out of the configuration. From here, you can add more visualizations (eg. Graphs, Tables, Singlestats, etc.), resize existing ones, add more rows, and more.
Here is an example of a graph created from the ECG sample:


This article has been an introductory overview on how to store Streams data with Elasticsearch and how to visualize them with Grafana. Any feedback or suggestions are appreciated!

Useful Links:



0 Favorited
0 Files