Streams Jobs as a Service: Documentation

 View Only

Streams Jobs as a Service: Documentation 

Mon November 23, 2020 06:45 PM


Enabling Streams jobs as services on Cloud Pak for Data allows REST access to the data in the Streams application. This feature is supported by two new operators called EndpointSink and EndpointSource.

To enable REST access to the data in any stream in your application, use the EndpointSink operator to consume the data from that stream.

Likewise, to consume data via REST, use the EndpointSource operator to produce a Stream of data received via REST.



Table of Contents


Feature overview

To learn more about this feature, watch a video and see a list of tutorials, see the feature announcement page.


Examples



SPL example

Make data avaialble via REST:


To make the data on the ScoredDataStream available via REST, send it to the EndpointSink operator:

  stream<DataSchemaOut> ScoredDataStream = com.ibm.spss.streams.analytics::SPSSScoring(data)
{
}

() as RESTSink = EndpointSink(ScoredDataStream)
{
//optional, comment out to use buffer size
/*param
bufferSize: 10000; //number of tuples to keep in memory
*/
}



Ingest data via REST:

To receive data via REST, invoke the EndpointSource operator:

  type IncomingDataType = uint64 index, timestamp timeStamp, rstring id;
composite Demo{
graph
stream<IncomingDataType> IncomingDataStream = EndpointSource (){

//optional, comment out to use buffer size
/*param
bufferSize: 10000; //max number of tuples that can be recieved at a time
*/
}
}

Python example



Make data avaialble via REST:

Invoke the EndpointSink operator from Python. Note that the input stream to the EndpointSink must have a StructuredSchema. This means that it must use a named tuple as its schema, or an instance of StreamSchema.

Below is an example of using a named tuple.

import random, time, datetime
import typing
from streamsx.topology.topology import Topology
from streamsx.service import EndpointSink

class Readings(typing.NamedTuple):
    reading: float
    index: int
    address: str
    timeStamp: int
        
def readings() -> typing.Iterable[Readings] :
    counter = 0
    while True:
        time.sleep(0.1)
        address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
        timeStamp = int(datetime.datetime.now().timestamp())
        yield Readings(random.randint(1,100), counter, address, timeStamp)
        counter = counter + 1

source = topo.source(readings)

##.. do processing here
# send each tuple to a REST Endpoint
source.for_each(EndpointSink(buffer_size=100000))

Ingest data via REST:

Use the EndpointSource class:



import random, time, datetime
import typing
from streamsx.topology.topology import Topology
import streamsx.topology.context
from streamsx.service import EndpointSource

topo = Topology(name="BasicTemplate", namespace="sample")

## Define the schema of the incoming data
class IncomingReadings(typing.NamedTuple):
    index: int
    ID: str
    timeStamp: datetime.datetime

topo = Topology(name="BasicSample_Receive_REST_1", namespace="sample")

# Use the Endpoint source class to receive POST requests
# the schema will be the IncomingReadings class
#  every item received will be a tuple on the incoming_data stream

incoming_data = topo.source(EndpointSource(schema=IncomingReadings))

incoming_data.print()



#CloudPakforDataGroup

Statistics

0 Favorited
71 Views
0 Files
0 Shares
0 Downloads

Comments

Tue December 08, 2020 10:48 AM

Hi, Heike,
Link to the video has been fixed, thank you.

Wed November 25, 2020 05:14 AM

Hi Janet, the hyperlink to video demo loops to exactly this same page - no video page. Is that intended?