Enabling Streams jobs as services on Cloud Pak for Data allows REST access to the data in the Streams application. This feature is supported by two new operators called EndpointSink
and EndpointSource
.
To enable REST access to the data in any stream in your application, use the EndpointSink
operator to consume the data from that stream.
Likewise, to consume data via REST, use the EndpointSource
operator to produce a Stream of data received via REST.
Table of Contents
Feature overview
To learn more about this feature, watch a video and see a list of tutorials, see the
feature announcement page.
Examples
SPL example
Make data avaialble via REST:
To make the data on the
ScoredDataStream
available via REST, send it to the
EndpointSink
operator:
stream<DataSchemaOut> ScoredDataStream = com.ibm.spss.streams.analytics::SPSSScoring(data)
{
}
() as RESTSink = EndpointSink(ScoredDataStream)
{
//optional, comment out to use buffer size
/*param
bufferSize: 10000; //number of tuples to keep in memory
*/
}
Ingest data via REST:
To receive data via REST, invoke the EndpointSource
operator:
type IncomingDataType = uint64 index, timestamp timeStamp, rstring id;
composite Demo{
graph
stream<IncomingDataType> IncomingDataStream = EndpointSource (){
//optional, comment out to use buffer size
/*param
bufferSize: 10000; //max number of tuples that can be recieved at a time
*/
}
}
Python example
Make data avaialble via REST:
Invoke the
EndpointSink
operator from Python. Note that the input stream to the EndpointSink must have a
StructuredSchema
. This means that it must use a named tuple as its schema, or an instance of StreamSchema.
Below is an example of using a named tuple.
import random, time, datetime
import typing
from streamsx.topology.topology import Topology
from streamsx.service import EndpointSink
class Readings(typing.NamedTuple):
reading: float
index: int
address: str
timeStamp: int
def readings() -> typing.Iterable[Readings] :
counter = 0
while True:
time.sleep(0.1)
address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
timeStamp = int(datetime.datetime.now().timestamp())
yield Readings(random.randint(1,100), counter, address, timeStamp)
counter = counter + 1
source = topo.source(readings)
##.. do processing here
# send each tuple to a REST Endpoint
source.for_each(EndpointSink(buffer_size=100000))
Ingest data via REST:
Use the
EndpointSource
class
:
import random, time, datetime
import typing
from streamsx.topology.topology import Topology
import streamsx.topology.context
from streamsx.service import EndpointSource
topo = Topology(name="BasicTemplate", namespace="sample")
## Define the schema of the incoming data
class IncomingReadings(typing.NamedTuple):
index: int
ID: str
timeStamp: datetime.datetime
topo = Topology(name="BasicSample_Receive_REST_1", namespace="sample")
# Use the Endpoint source class to receive POST requests
# the schema will be the IncomingReadings class
# every item received will be a tuple on the incoming_data stream
incoming_data = topo.source(EndpointSource(schema=IncomingReadings))
incoming_data.print()
#CloudPakforDataGroup