Cloud Pak for Data Group

Microservices in Streams: Using the Publish/Subscribe Operators 

Tue August 25, 2020 06:09 PM

Written by Dan Debrunner.

Overview

As discussed in this post, Streams dynamic connections allow you to design a Streams application as a set of microservices.

Dynamic connections were originally implemented by the Import and Export operators.  However,  the com.ibm.streamsx.topology toolkit now provides the publish-subscribe paradigm through the Publish and Subscribe SPL operators and the publish and subscribe functions in Java and Python API.

This  is a simplification of the connections mechanism provided by Import/Export.

This article will discuss the motivation behind this simplification and an overview of how to use publish-subscribe in Java, Scala, SPL and Python applications.

Why use Publish/Subscribe instead of Import/Export?

This was introduced for three reasons:
  1. To make the simple case simple to use when using a microservice architecture.
  2. To support Java and Python developers. One requirement in supporting other languages was that the developer need not have SPL knowledge, thus requiring a developer to understand SPL subscription expressions would break that requirement.
  3. Allow stream connections between streaming applications implemented in different programming languages.

Use of publish-subscribe then allows a Streams instance to perform streaming analytics using a collection of microservices (Streams applications)  implemented with a variety of languages, allowing use of the most suitable programming language for a task. These microservices connect through streams using publish-subscribe. For example ingest and data-prep microservices might be implemented in SPL for high performance, while a Python microservice is for execution of a scoring algorithm from a existing Python package, and a Java microservice provides connectivity to an external service.

Topics

With publish-subscribe the microservice interface is a published stream which is defined by:
  • A topic. For example, buses/locations
  • A stream type.
Topics are just a string following the MQTT topic style, where / is a separator between levels. Subscribers then match publishers using a topic pattern and type. A topic pattern may be an exact match (e.g. buses/locations) or include a wildcard character:
  • + to match a topics at a single level, e.g. buses/+ matches buses/locations and buses/alerts.
  • # to match all topics below a level, e.g. buses/# matches buses/locations and buses/updates/routes
Regardless of the wildcard, a subscriber only matches a topic if the type also matches.

Microservice architecture

Apart from the simplification of using topic based matching publish-subscribe fulfills the microservice architecture by not requiring the subscriber have knowledge of how an application is publishing its stream, thus truly creating independence between publishers and subscribers. This includes correct behavior when:
  • Either the publisher and/or subscriber are in parallel regions, for example publishing a stream from a parallel region of width three and the subscriber not being in a parallel region, or the subscriber is in a parallel region of width two.
  • Regardless of the publishing application allowing filters or not.
With simplification comes some loss of functionality, applications dynamically changing their published or subscribed topic is not supported. Under the covers publish-subscribe is implemented using SPL dynamic connections and the set of properties for the published stream is documented, so that sophisticated SPL developers can integrate the full functionality of dynamic connections if desired.

Publish-subscribe with SPL

An SPL application publishes a stream by invoking the com.ibm.streamsx.topology.topic::Publish operator.
use com.ibm.streamsx.topology.topic::Publish;
use com.ibm.streamsx.transportation.vehicle::VehicleLocation;
...
stream Locations = ...

() as PubLoc = Publish(Locations) {
param
topic: "buses/locations";
}
An SPL application then subscribes to streams by invoking the com.ibm.streamsx.topology.topic::Subscribe operator.
use com.ibm.streamsx.topology.topic::Publish;
use com.ibm.streamsx.transportation.vehicle::VehicleLocation;
...
stream Locs = Subscribe() {
param
topic: "buses/locations";
streamType: VehicleLocation;
}
Note that SPL uses structural type equivalence so that the actual type definition used need not be not same between publisher and subscriber, but it must be structurally the same for a match. So in this case the publisher can use the type from the com.ibm.streamsx.transportation toolkit while the subscriber could use tuple so as not to have a dependency on the transportation toolkit.

Publish-subscribe with Java application API

A Java Streams application publishes a TStream or SPLStream by invoking the publish method. For a TStream the type of the stream is the type of the Java object, for example TStream is publishing a stream of Java com.buscompany.vehicles.Bus objects.
import com.buscompany.vehicles.Bus;
...

TStream stream = ...

stream.publish("buses");
A Java Streams application subscribes to a stream using Topology.subscribe method providing a topic pattern and a type:
import com.buscompany.vehicles.Bus;
...
TStream stream = topology.subscribe("buses", Bus.class);
When a stream is published using a Java type then a subscription is matched on the exact class only, a published stream with the same topic but a sub-class of com.buscompany.vehicles.Bus would not be matched by this subscription. If the Java stream is an SPLStream then the published type is the stream's SPL schema.
SPLStream alerts = ...
// type of the published stream is stream.getSchema()
stream.publish("buses/alerts");
A Java Streams application subscribes to a stream with an SPL schema type using SPLStreams.subscribe():
StreamSchema ALERT_SCHEMA = ...
SPLStream alerts = SPLStreams.subscribe(topology, "buses/alerts", ALERT_SCHEMEA);
Scala applications use the same methods as Java applications for publish and subscribe.

Publish-subscribe with Python application API

A Python Streams application publishes a Stream by calling its publish function:
stream = ...
stream.publish('buses/efficiency')
By default the publish type is Python object so that any Python Streams application can subscribe to the stream:
stream = topology.subscribe('buses/efficiency')
A Python stream may be published as a stream of strings or JSON objects by using the optional schema argument.
// each tuple (Python object) is published as a string using str(tuple)
stream.publish('buses/efficiency/strings', schema=CommonSchema.String)

// each tuple (Python object) is published as JSON using json.dumps(tuple)
stream.publish('buses/efficiency/strings', schema=CommonSchema.Json)

Subscribe also takes an optional schema parameter to specify the stream type:
// Subscribe to a published stream with type String
stream = topology.subscribe('buses/efficiency', schema=CommonSchema.String)
Published streams with arbitrary SPL schemas can be subscribed to:
// Subscribe to a published stream with type tuple
schema = StreamSchema('tuple')
stream = topology.subscribe('buses/efficiency', schema)

Publish-subscribe across languages

Using the microservice architecture a complete Streams system can contain multiple applications interacting with each other using the publish-subscribe model, and each application can be implemented in the programming language best suited for the task. Publish-subscribe works across programming languages allowing:
  • the publisher and subscriber to be implemented in different languages
  • applications implemented in different languages to publish to the same topic and type
  • applications implemented in different languages to subscribe to the same topic and type
A publish-subscribe stream type is programming language independent if it one of the following types:
  • JSON - A stream of JSON object.
  • SPL schema - SPL schema.
  • String - A stream of string objects (e.g. lines from a file).
  • Binary - A stream of binary objects
  • XML - A stream of XML documents
Once a stream is published in any of these formats, any application can consume it (some current limitations exist for Python applications). For example a Java published TStream can be subscribed to by an SPL application using a Json schema, or a Python application using CommonSchema.Json. A stream of Java objects published by Java or Scala can only be subscribed to by Java and Scala applications. A stream of Python objects can only be subscribed to by Python applications.

JSON streams

JSON is a common data interchange format thus is a natural publish format to allow subscription by any application.
Language Stream type Notes
SPL com.ibm.streamsx.topology::Json com.ibm.streamsx.json::Json tuple<rstring jsonString> All these schemas are structurally equivalent thus may be used to represent JSON streams for publish-subscribe. Operators and functions from the com.ibm.streamsx.json toolkit can be used to convert SPL schemas to/from JSON.
Java TStream<com.ibm.json.JSONObject> IBM4J's is used to represent a JSON object.
Scala TStream[com.ibm.json.JSONObject]
Python streamsx.topology.schema.CommonSchema.Json Using CommonSchema.Json on a publish results in a published JSON stream, each Python object is converted to JSON using json.dumps. Note that the Python object must be serializable as JSON, specifically complex and set objects are not serializable to JSON. Using CommonSchema.Json on a subscribe subscribes to any published JSON stream, input tuples are converted to Python dictionaries using json.loads. Note that the subscribed stream in Python has no type associated with it, since all streams in a Python Streams application are just streams of Python objects.

SPL streams

Streams with an arbitrary SPL schema are supported by all languages (some limitations exist for Python), so that an SPL stream such as tuple can be subscribed to by SPL, Python, Java and Scala applications.
Language Stream type Notes
SPL tuple<type name [, ...]>
Java SPLStream Each tuple on the stream is converted to an instance of com.ibm.streams.operator.Tuple, the standard IBM Streams representation of an SPL tuple in Java. SPLStream is strongly typed to a schema through its getSchema() method.
Scala
Python streamsx.topology.schema.StreamSchema('tuple<type name [, ...]>') Python only supports subscribing to SPL streams, publishing is not yet supported. A limited number of SPL types is supported. Each tuple is converted to a Python dictionary object with the keys being the attributes name and the values the value of the named attribute.

String streams

A stream is a sequence of String objects, for example the output from an SPL FileSource operator.
Language Stream type Notes
SPL com.ibm.streamsx.topology::String tuple<rstring string> Schemas are structurally equivalent thus may be used to represent String streams for publish-subscribe. UTF-8 encoding is used for the rstring
Java TStream<String> Java's natural string representation is used (java.lang.String).
Scala TStream[String]
Python streamsx.topology.schema.CommonSchema.String Using CommonSchema.String on a publish results in a published String stream, each Python object is converted to a string using str(tuple). Using CommonSchema.String on a subscribe subscribes to any published String stream, input tuples are converted to Python Unicode strings.

XML streams

A stream is a sequence of XML documents.
Language Stream type Notes
SPL com.ibm.streamsx.topology::XML tuple<xml document> Schemas are structurally equivalent thus may be used to represent XML streams for publish-subscribe.
Java TStream<com.ibm.streams.operator.types.XML> The Java Operator API representation for an XML attribute is used to hold the XML document in Java and Scala.
Scala TStream[com.ibm.streams.operator.types.XML]
Python Not yet supported.

Blob streams

A stream is a sequence of blobs or binary data.
Language Stream type Notes
SPL com.ibm.streamsx.topology::Blob tuple<blob binary> Schemas are structurally equivalent thus may be used to represent blob streams for publish-subscribe.
Java TStream<com.ibm.streams.operator.types.Blob> The Java Operator API representation for a blob attribute is used to hold the binary data in Java and Scala.
Scala TStream[com.ibm.streams.operator.types.Blob]
Python Not yet supported.

Further reading


Written by Dan Debrunner.

Statistics

0 Favorited
9 Views
0 Files
0 Shares
0 Downloads