Cloud Pak for Data Group

Send and receive streaming data via REST with IBM Streams

By NATASHA D'SILVA posted Mon December 07, 2020 07:48 PM

  

When you use IBM Streams to analyze data in real time, your application typically produces a continuous stream of the results. For example, you might use Streams to monitor financial transactions in real time, and then flag potentially fraudulent transactions as they occur.

Two common questions from Streams developers are, “How can I access the results of the Streams application”, and , “How can I send data to the Streams application?” Continuing the fraud application example, you might want to access the latest results to display it in a dashboard, like shown above. Or you might wish to send new data to the Streams application for processing.

Streams Jobs as a Service

Prior to Cloud Pak for Data 3.5, the answer was to send the data from Streams to one of the over a dozen supported messaging systems and databases, like Apache Kakfa, or RabbitMQ, and then connect to that system to retrieve the results. This method still works and scales very well.

In the latest version of Streams on Cloud Pak for Data, you can now enable REST access to the data from a running Streams application, or job. This is done by allowing Streams jobs to become services in Cloud Pak for Data. Once the job is running as a service, it provides REST API endpoints that can be used to send/receive data.

This post has a demo of this feature as well as some code examples for using it in SPL and Python applications.



Table of contents

See it in action

Watch this video to see how to enable REST access to the data in a Streams application running in Cloud Pak for Data 3.5.



How to use it

To enable a job as a Cloud Pak for Data service, you must invoke one of the new EndpointSource/EndpointSink operators to the application.
These operators can be invoked from SPL and Python applications. When a job containing at least one of the new operators is submitted, the Streams runtime enables REST API access to the job from outside the OpenShift cluster. Data retrieved from the Streams application is in JSON format, as must be any data sent to the application. Each Streams tuple consumed or produced by the application is a JSON object.


These are the 3 basic steps to start sending/receiving data via REST.

  1. Use the operators to enable the REST endpoints:
    • Modify the Streams application to add the EndpointSink or EndpointSource operator.

      For example, to enable REST access to the stream ResultStream, add the following code snippet:
      () as RESTSink = EndpointSink(ResultStream){
      }
      
    • Build and submit the application, noting the job id

  2. Get the URLs from the service instance in the CPD UI: Once the application is running, find the REST endpoints.
      1. From the Cloud Pak for Data console, go to Services > Instances.

      2. Find the new job service in the list of instances. Under Type it will have "streams-application" and the Name will be of the form <streams_instance.space_name.jobid>, where streams_instance is the instance name, space_name is the deployment space name, and job_id is the id of the submitted job.

      3. Click the job service to go to the Swagger Documentation page.

      4. Find the URL for the GET or POST endpoint(s) listed under Data access endpoints.
  3. Use the URL to access data, the final URL will be <CPD_URL> + <GET/POST URL>.
    Note: In this initial release, only the user who submits the job can see the job service in their Instances list. Other users can access the data via the REST API URL. To have access, each user needs to be added to the deployment space where the job was submitted. See the documentation to learn how to add a user to a deployment space.





Examples

SPL example


Make data avaialble via REST:

For example, to make the data on the ScoredDataStream available via REST, send it to the EndpointSink operator:

  stream<DataSchemaOut> ScoredDataStream = com.ibm.spss.streams.analytics::SPSSScoring(data)
{
}

() as RESTSink = EndpointSink(ScoredDataStream)
{
//optional, comment out to use buffer size
/*param
bufferSize: 10000; //number of tuples to keep in memory
*/
}


Python example



Make data avaialble via REST:
Use the EndpointSink class. Note that the input stream to the EndpointSink must have a StructuredSchema. This means that it must use a named tuple as its schema, or an instance of StreamSchema.

Below is an example of using a named tuple.

from streamsx.service import EndpointSink

class Readings(typing.NamedTuple):
    reading: float
    index: int
    address: str
    timeStamp: int
        
def readings() -> typing.Iterable[Readings] :
    counter = 0
    while True:
        #time.sleep(0.1)
        address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
        timeStamp = int(datetime.datetime.now().timestamp())
        yield Readings(random.randint(1,100), counter, address, timeStamp)
        counter = counter + 1

source = sender_topology.source(readings)

## Send the stream to the Endpoint Sink 
source.for_each(EndpointSink(buffer_size=100000), name="REST Sink")


​

 You can follow one of the following tutorials for more complete examples.   The documentation also has more code snippets.

Tutorials

Here are some tutorials to get started:

    Documentation

    The documentation has more examples and more information about how this feature works.


    #Highlights


    #Highlights-home
    0 comments
    699 views

    Permalink