Written by Dan Debrunner.
Overview
As discussed
in this post, Streams dynamic connections allow you to design a Streams application as a set of microservices.
Dynamic connections were originally implemented by the
Import and
Export operators. However, the
com.ibm.streamsx.topology toolkit now provides the
publish-subscribe paradigm through the Publish and Subscribe SPL operators and the
publish
and
subscribe
functions in Java and Python API.
This is a simplification of the connections mechanism provided by Import/Export.
This article will discuss the motivation behind this simplification and an overview of how to use publish-subscribe in Java, Scala, SPL and Python applications.
Why use Publish/Subscribe instead of Import/Export?
This was introduced for three reasons:
- To make the simple case simple to use when using a microservice architecture.
- To support Java and Python developers. One requirement in supporting other languages was that the developer need not have SPL knowledge, thus requiring a developer to understand SPL subscription expressions would break that requirement.
- Allow stream connections between streaming applications implemented in different programming languages.
Use of publish-subscribe then allows a Streams instance to
perform streaming analytics using a collection of microservices (Streams applications) implemented with a variety of languages, allowing use of the most suitable programming language for a task. These microservices connect through streams using publish-subscribe. For example ingest and data-prep microservices might be implemented in SPL for high performance, while a Python microservice is for execution of a scoring algorithm from a existing Python package, and a Java microservice provides connectivity to an external service.
Topics
With publish-subscribe the microservice interface is a published stream which is defined by:
- A topic. For example,
buses/locations
- A stream type.
Topics are just a string following the MQTT topic style, where
/
is a separator between levels. Subscribers then match publishers using a topic pattern and type. A topic pattern may be an exact match (e.g.
buses/locations
) or include a wildcard character:
+
to match a topics at a single level, e.g. buses/+
matches buses/locations
and buses/alerts
.
#
to match all topics below a level, e.g. buses/#
matches buses/locations
and buses/updates/routes
Regardless of the wildcard, a subscriber only matches a topic if the type also matches.
Microservice architecture
Apart from the simplification of using topic based matching publish-subscribe fulfills the microservice architecture by not requiring the subscriber have knowledge of how an application is publishing its stream, thus truly creating independence between publishers and subscribers. This includes correct behavior when:
- Either the publisher and/or subscriber are in parallel regions, for example publishing a stream from a parallel region of width three and the subscriber not being in a parallel region, or the subscriber is in a parallel region of width two.
- Regardless of the publishing application allowing filters or not.
With simplification comes some loss of functionality, applications dynamically changing their published or subscribed topic is not supported. Under the covers publish-subscribe is implemented using SPL dynamic connections and the set of properties for the published stream is documented, so that sophisticated SPL developers can integrate the full functionality of dynamic connections if desired.
Publish-subscribe with SPL
An SPL application publishes a stream by invoking the
com.ibm.streamsx.topology.topic::Publish
operator.
use com.ibm.streamsx.topology.topic::Publish;
use com.ibm.streamsx.transportation.vehicle::VehicleLocation;
...
stream Locations = ...
() as PubLoc = Publish(Locations) {
param
topic: "buses/locations";
}
An SPL application then subscribes to streams by invoking the
com.ibm.streamsx.topology.topic::Subscribe
operator.
use com.ibm.streamsx.topology.topic::Publish;
use com.ibm.streamsx.transportation.vehicle::VehicleLocation;
...
stream Locs = Subscribe() {
param
topic: "buses/locations";
streamType: VehicleLocation;
}
Note that SPL uses structural type equivalence so that the actual type definition used need not be not same between publisher and subscriber, but it must be structurally the same for a match. So in this case the publisher can use the type from the
com.ibm.streamsx.transportation
toolkit while the subscriber could use
tuple
so as not to have a dependency on the transportation toolkit.
Publish-subscribe with Java application API
A Java Streams application publishes a
TStream
or
SPLStream
by invoking the
publish
method. For a
TStream
the type of the stream is the type of the Java object, for example
TStream
is publishing a stream of Java
com.buscompany.vehicles.Bus
objects.
import com.buscompany.vehicles.Bus;
...
TStream stream = ...
stream.publish("buses");
A Java Streams application subscribes to a stream using
Topology.subscribe
method providing a topic pattern and a type:
import com.buscompany.vehicles.Bus;
...
TStream stream = topology.subscribe("buses", Bus.class);
When a stream is published using a Java type then a subscription is matched on the exact class only, a published stream with the same topic but a sub-class of
com.buscompany.vehicles.Bus
would not be matched by this subscription. If the Java stream is an
SPLStream
then the published type is the stream's SPL schema.
SPLStream alerts = ...
// type of the published stream is stream.getSchema()
stream.publish("buses/alerts");
A Java Streams application subscribes to a stream with an SPL schema type using
SPLStreams.subscribe()
:
StreamSchema ALERT_SCHEMA = ...
SPLStream alerts = SPLStreams.subscribe(topology, "buses/alerts", ALERT_SCHEMEA);
Scala applications use the same methods as Java applications for publish and subscribe.
Publish-subscribe with Python application API
A Python Streams application publishes a
Stream
by calling its
publish
function:
stream = ...
stream.publish('buses/efficiency')
By default the publish type is Python object so that any Python Streams application can subscribe to the stream:
stream = topology.subscribe('buses/efficiency')
A Python stream may be published as a stream of strings or JSON objects by using the optional
schema
argument.
// each tuple (Python object) is published as a string using str(tuple)
stream.publish('buses/efficiency/strings', schema=CommonSchema.String)
// each tuple (Python object) is published as JSON using json.dumps(tuple)
stream.publish('buses/efficiency/strings', schema=CommonSchema.Json)
Subscribe also takes an optional
schema
parameter to specify the stream type:
// Subscribe to a published stream with type String
stream = topology.subscribe('buses/efficiency', schema=CommonSchema.String)
Published streams with arbitrary SPL schemas can be subscribed to:
// Subscribe to a published stream with type tuple
schema = StreamSchema('tuple')
stream = topology.subscribe('buses/efficiency', schema)
Publish-subscribe across languages
Using the microservice architecture a complete Streams system can contain multiple applications interacting with each other using the publish-subscribe model, and each application can be implemented in the programming language best suited for the task. Publish-subscribe works across programming languages allowing:
- the publisher and subscriber to be implemented in different languages
- applications implemented in different languages to publish to the same topic and type
- applications implemented in different languages to subscribe to the same topic and type
A publish-subscribe stream type is programming language independent if it one of the following types:
- JSON - A stream of JSON object.
- SPL schema - SPL schema.
- String - A stream of string objects (e.g. lines from a file).
- Binary - A stream of binary objects
- XML - A stream of XML documents
Once a stream is published in any of these formats, any application can consume it (some current limitations exist for Python applications). For example a Java published
TStream
can be subscribed to by an SPL application using a
Json
schema, or a Python application using
CommonSchema.Json
. A stream of Java objects published by Java or Scala can only be subscribed to by Java and Scala applications. A stream of Python objects can only be subscribed to by Python applications.
JSON streams
JSON is a common data interchange format thus is a natural publish format to allow subscription by any application.
Language |
Stream type |
Notes |
SPL |
com.ibm.streamsx.topology::Json com.ibm.streamsx.json::Json tuple<rstring jsonString> |
All these schemas are structurally equivalent thus may be used to represent JSON streams for publish-subscribe. Operators and functions from the com.ibm.streamsx.json toolkit can be used to convert SPL schemas to/from JSON. |
Java |
TStream<com.ibm.json.JSONObject> |
IBM4J's is used to represent a JSON object. |
Scala |
TStream[com.ibm.json.JSONObject] |
Python |
streamsx.topology.schema.CommonSchema.Json |
Using CommonSchema.Json on a publish results in a published JSON stream, each Python object is converted to JSON using json.dumps . Note that the Python object must be serializable as JSON, specifically complex and set objects are not serializable to JSON. Using CommonSchema.Json on a subscribe subscribes to any published JSON stream, input tuples are converted to Python dictionaries using json.loads . Note that the subscribed stream in Python has no type associated with it, since all streams in a Python Streams application are just streams of Python objects. |
SPL streams
Streams with an arbitrary SPL schema are supported by all languages (some limitations exist for Python), so that an SPL stream such as
tuple
can be subscribed to by SPL, Python, Java and Scala applications.
Language |
Stream type |
Notes |
SPL |
tuple<type name [, ...]> |
|
Java |
SPLStream |
Each tuple on the stream is converted to an instance of com.ibm.streams.operator.Tuple , the standard IBM Streams representation of an SPL tuple in Java. SPLStream is strongly typed to a schema through its getSchema() method. |
Scala |
Python |
streamsx.topology.schema.StreamSchema('tuple<type name [, ...]>') |
Python only supports subscribing to SPL streams, publishing is not yet supported. A limited number of SPL types is supported. Each tuple is converted to a Python dictionary object with the keys being the attributes name and the values the value of the named attribute. |
String streams
A stream is a sequence of String objects, for example the output from an SPL
FileSource
operator.
Language |
Stream type |
Notes |
SPL |
com.ibm.streamsx.topology::String tuple<rstring string>
|
Schemas are structurally equivalent thus may be used to represent String streams for publish-subscribe. UTF-8 encoding is used for the rstring |
Java |
TStream<String> |
Java's natural string representation is used (java.lang.String). |
Scala |
TStream[String] |
Python |
streamsx.topology.schema.CommonSchema.String |
Using CommonSchema.String on a publish results in a published String stream, each Python object is converted to a string using str(tuple) . Using CommonSchema.String on a subscribe subscribes to any published String stream, input tuples are converted to Python Unicode strings. |
XML streams
A stream is a sequence of XML documents.
Language |
Stream type |
Notes |
SPL |
com.ibm.streamsx.topology::XML tuple<xml document>
|
Schemas are structurally equivalent thus may be used to represent XML streams for publish-subscribe. |
Java |
TStream<com.ibm.streams.operator.types.XML> |
The Java Operator API representation for an XML attribute is used to hold the XML document in Java and Scala. |
Scala |
TStream[com.ibm.streams.operator.types.XML] |
Python |
|
Not yet supported. |
Blob streams
A stream is a sequence of blobs or binary data.
Language |
Stream type |
Notes |
SPL |
com.ibm.streamsx.topology::Blob tuple<blob binary>
|
Schemas are structurally equivalent thus may be used to represent blob streams for publish-subscribe. |
Java |
TStream<com.ibm.streams.operator.types.Blob> |
The Java Operator API representation for a blob attribute is used to hold the binary data in Java and Scala. |
Scala |
TStream[com.ibm.streams.operator.types.Blob] |
Python |
|
Not yet supported. |
Further reading
Written by Dan Debrunner.#CloudPakforDataGroup