Cloud Pak for Data Group

Streams Jobs as a Service: Documentation 

Mon November 23, 2020 06:45 PM

Enabling Streams jobs as services on Cloud Pak for Data allows REST access to the data in the Streams application. This feature is supported by two new operators called EndpointSink and EndpointSource.

To enable REST access to the data in any stream in your application, use the EndpointSink operator to consume the data from that stream.

Likewise, to consume data via REST, use the EndpointSource operator to produce a Stream of data received via REST.

Table of Contents

Feature overview

To learn more about this feature, watch a video and see a list of tutorials, see the feature announcement page.


SPL example

Make data avaialble via REST:

To make the data on the ScoredDataStream available via REST, send it to the EndpointSink operator:

  stream<DataSchemaOut> ScoredDataStream =

() as RESTSink = EndpointSink(ScoredDataStream)
//optional, comment out to use buffer size
bufferSize: 10000; //number of tuples to keep in memory

Ingest data via REST:

To receive data via REST, invoke the EndpointSource operator:

  type IncomingDataType = uint64 index, timestamp timeStamp, rstring id;
composite Demo{
stream<IncomingDataType> IncomingDataStream = EndpointSource (){

//optional, comment out to use buffer size
bufferSize: 10000; //max number of tuples that can be recieved at a time

Python example

Make data avaialble via REST:

Invoke the EndpointSink operator from Python. Note that the input stream to the EndpointSink must have a StructuredSchema. This means that it must use a named tuple as its schema, or an instance of StreamSchema.

Below is an example of using a named tuple.

import random, time, datetime
import typing
from streamsx.topology.topology import Topology
from streamsx.service import EndpointSink

class Readings(typing.NamedTuple):
    reading: float
    index: int
    address: str
    timeStamp: int
def readings() -> typing.Iterable[Readings] :
    counter = 0
    while True:
        address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
        timeStamp = int(
        yield Readings(random.randint(1,100), counter, address, timeStamp)
        counter = counter + 1

source = topo.source(readings)

##.. do processing here
# send each tuple to a REST Endpoint

Ingest data via REST:

Use the EndpointSource class:

import random, time, datetime
import typing
from streamsx.topology.topology import Topology
import streamsx.topology.context
from streamsx.service import EndpointSource

topo = Topology(name="BasicTemplate", namespace="sample")

## Define the schema of the incoming data
class IncomingReadings(typing.NamedTuple):
    index: int
    ID: str
    timeStamp: datetime.datetime

topo = Topology(name="BasicSample_Receive_REST_1", namespace="sample")

# Use the Endpoint source class to receive POST requests
# the schema will be the IncomingReadings class
#  every item received will be a tuple on the incoming_data stream

incoming_data = topo.source(EndpointSource(schema=IncomingReadings))


Coming soon:
Operator documenation

-consistent region/parallel
- number of input/output ports
-list of REST API Error codes


- parameters
- consistent regions/parallel
-number of input/output ports
- list of REST API error codes

Accessing the data at runtime

Once the application is running this is the information you need to access the data

Finding the instance
Data access - JSON format, JSON - SPL type list


Implementation details


0 Favorited
0 Files


Tue December 08, 2020 10:48 AM

Hi, Heike,
Link to the video has been fixed, thank you.

Wed November 25, 2020 05:14 AM

Hi Janet, the hyperlink to video demo loops to exactly this same page - no video page. Is that intended?