When you use IBM Streams to analyze data in real time, your application typically produces a continuous stream of the results. For example, you might use Streams to monitor financial transactions in real time, and then flag potentially fraudulent transactions as they occur.
Two common questions from Streams developers are, “How can I access the results of the Streams application”, and , “How can I send data to the Streams application?” Continuing the fraud application example, you might want to access the latest results to display it in a dashboard, like shown above. Or you might wish to send new data to the Streams application for processing.
Streams Jobs as a Service
Prior to Cloud Pak for Data 3.5, the answer was to send the data from Streams to one of the over a dozen supported messaging systems and databases, like Apache Kakfa, or RabbitMQ, and then connect to that system to retrieve the results. This method still works and scales very well.
In the latest version of Streams on Cloud Pak for Data, you can now enable REST access to the data from a running Streams application, or job. This is done by allowing Streams jobs to become services in Cloud Pak for Data. Once the job is running as a service, it provides REST API endpoints that can be used to send/receive data.
This post has a demo of this feature as well as some code examples for using it in SPL and Python applications.
Table of contents
See it in action
Watch this video to see how to enable REST access to the data in a Streams application running in Cloud Pak for Data 3.5.
How to use it
To enable a job as a Cloud Pak for Data service, you must invoke one of the new
EndpointSink operators to the application.
These operators can be invoked from SPL and Python applications. When a job containing at least one of the new operators is submitted, the Streams runtime enables REST API access to the job from outside the OpenShift cluster. Data retrieved from the Streams application is in JSON format, as must be any data sent to the application. Each Streams tuple consumed or produced by the application is a JSON object.
These are the 3 basic steps to start sending/receiving data via REST.
- Use the operators to enable the REST endpoints:
- Get the URLs from the service instance in the CPD UI: Once the application is running, find the REST endpoints.
- From the Cloud Pak for Data console, go to Services > Instances.
- Find the new job service in the list of instances. Under Type it will have "streams-application" and the Name will be of the form
streams_instance is the instance name,
space_name is the deployment space name, and
job_id is the id of the submitted job.
- Click the job service to go to the Swagger Documentation page.
- Find the URL for the GET or POST endpoint(s) listed under Data access endpoints.
- Use the URL to access data, the final URL will be
Note: In this initial release, only the user who submits the job can see the job service in their Instances list. Other users can access the data via the REST API URL. To have access, each user needs to be added to the deployment space where the job was submitted. See the documentation to learn how to add a user to a deployment space.
Make data avaialble via REST:
For example, to make the data on the
ScoredDataStream available via REST, send it to the
stream<DataSchemaOut> ScoredDataStream = com.ibm.spss.streams.analytics::SPSSScoring(data)
() as RESTSink = EndpointSink(ScoredDataStream)
//optional, comment out to use buffer size
bufferSize: 10000; //number of tuples to keep in memory
Make data avaialble via REST:
EndpointSink class. Note that the input stream to the EndpointSink must have a
StructuredSchema. This means that it must use a named tuple as its schema, or an instance of StreamSchema.
Below is an example of using a named tuple.
from streamsx.service import EndpointSink
def readings() -> typing.Iterable[Readings] :
counter = 0
address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
timeStamp = int(datetime.datetime.now().timestamp())
yield Readings(random.randint(1,100), counter, address, timeStamp)
counter = counter + 1
source = sender_topology.source(readings)
## Send the stream to the Endpoint Sink
source.for_each(EndpointSink(buffer_size=100000), name="REST Sink")
You can follow one of the following tutorials for more complete examples. The documentation also has more code snippets.
Here are some tutorials to get started:
The documentation has more examples and more information about how this feature works.