Introduction
The Flink DataStream API is a flexible way for creating Flink jobs with a fine grain control on the processing. However, when your Flink job gets under heavy workload, some performance issue might appear and they could be related to serialization or deserialization overhead. This article provides some insights about the causes and the possible ways to improve the processing performance.
The need for serialization of objects
In a nutshell, Flink needs to serialize objects to transfer data through the network between TaskManagers or when these objects are persisted to a state.
At its core, a Flink job is composed of Flink operators that exchange data using data streams. These data streams are strongly typed and convey two kinds of stream records, namely objects and watermarks. Ideally, the type of the objects must correspond to the type of the data stream and must be serializable. Regarding watermarks, they are always serializable.
In addition, Flink operators can be either stateless or stateful. The latter makes it possible to store or retrieve data from a state that is in memory, on a disk, a persistent volume claim or a scalable storage service. Any Flink stateful operator must serialize objects before storing them in a state and deserialize them when it must use or update their values.
Finally, the Flink DataStream API lets you register timers for performing calculations at specific processing time points. Those timers also ends up in state and are part of the serialization process.

However, serialization can quickly become a bottleneck for Flink operators if it not carefully kept under control.
The aftermath of slow serialization and deserialization
- When a Flink operator spends too much time performing serialization or deserialization, it has less time to perform other computations.
- When a Flink stateful operator deals with a large state and checkpointing is enabled, there could be situations where too much time is taken to complete checkpoints. In extreme situations, the Flink job is fully busy and cannot respond to the TaskManager that in turn fails to respond to the JobManager in the appropriate time range, thereby causing the termination of the Flink job.


- A Flink operator that undergoes a "HIGH" backpressure might be an indication of a slowness in the next Flink operator downstream. This one might suffer from slow serialization or deserialization operations.
This can be confirmed in the Flink web UI on that downstream Flink operator by checking its thread dump that can reveal a Java call stack with serialization or deserialization operations.

The ways to improve serialization operations
Define the appropriate state
Flink provides several built-in states for in-memory, RocksDB (or ForSt states in the upcoming Flink 2.1 release).
The commonly used states for general purposes are:
- ValueState<T>
- ListState<T>
- MapState<K,V>
Usually, using a primitive type (Long
, String
, …) or a POJO for the type <T>
with primitive properties often achieve good serialization performance.
Under the hood, Flink uses internal classes to handle POJOs such as PojoSerializer
, PojoSerializerSnapshot
and PojoSerializerData
classes to keep track of fields and registered subclasses to efficiently serialize and deserialize this kind of structure.
A basic implementation of a POJO is a serializable class with the following characteristics:
They can be used in custom implementations of Flink rich functions such as RichFlatMapFunction
or keyed functions such as KeyedProcessFunction
.
- ValueState<T> is used to handle a POJO or a
List
.
Example
private transient ValueState<Pojo> valueState;
public void open(Configuration config) {
ValueStateDescriptor<Pojo> stateDescriptor =
new ValueStateDescriptor<>("custom-state-name", Pojo.class);
valueState = getRuntimeContext().getState(stateDescriptor);
}
- ListState<T> is an optimized collection that can be directly handled within your Flink function to quickly add all elements of another list. Unfortunately, it does not offer the ability to reorder objects added since no Java
Comparator
can be supplied. Should you need to rely on such a reordering operation, you may prefer resorting to an ordered collection provided by the Java Virtual Machine, for example ValueState<PriorityQueue<T>>
or ValueState<ConcurrentSkipListSet<T>>
.
Example
private transient ListState<Pojo> listState
public void open(Configuration config) {
ListStateDescriptor<Pojo> stateDescriptor =
new ListStateDescriptor<>("custom-state-name", Pojo.class);
listState = getRuntimeContext().getListState(stateDescriptor);
}
- MapState<K, V> is a key/value map but with a limited set of operations, there is especially no
size()
method. As for ValueState
and ListState
, primitive types and POJO allows to achieve good performance for types <K>
and <V>
.
Example
private transient MapState<String, Pojo> mapState;
public void open(Configuration config) {
MapStateDescriptor<String, Pojo> stateDescriptor =
new MapStateDescriptor <>(
"custom-state-name",
TypeInformation.of(String.class) ,
TypeInformation.of(Pojo.class)
);
mapState = getRuntimeContext().getMapState(stateDescriptor);
}
In a rich function or a keyed function, you may end up defining a state in the form MapState<Long, List<T>>
where <T>
is a primitive type or a POJO. There is no built-in Flink serializer for the List
, so Flink fallbacks to the Kryo library that provides several serializers to process lists, maps, enums quite efficiently.
The key of the MapState
must not be mixed up with the key of a keyed stream extracted with a KeySelector
. This key is retrieved in keyed function or rich functions via Context.getCurrentKey(
). This value should never be stored in a state.
Update the state values only when required
Ideally, within a rich function or a keyed function, the following sequence operations leads to good performance from a serialization / deserialization standpoint:
- Retrieve the state value or initialized it if it has not yet been used.
- Perform all the required operations on this state value.
- Store the new value in the state as little as possible:
o After invoking the Flink Collector.collect(T record)
method or reaching the end of the function if there is no need to output a record.
o Before registering an event-time or processing-time timer.
- Remove any unnecessary entry from the state (
MapState
or ListState
) to constrain its size.
Example
The following example showcases the above mentioned points:
public class KeyedFunction extends KeyedProcessFunction<String, Pojo, String> {
private static final long serialVersionUID = 1L;
private transient MapState<Long, List<Pojo>> mapState;
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, List<Pojo>> descriptor =
new MapStateDescriptor<>(
"map_state_name",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<Pojo>>() { }));
mapState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(Pojo event,
KeyedProcessFunction<String, Pojo, String>.Context ctx,
Collector<String> out) throws Exception {
TimerService timerService = ctx.timerService();
Long eventTimestamp = ctx.timestamp();
long currentWatermark = ctx.timerService().currentWatermark();
if (eventTimestamp > currentWatermark) {
List<Pojo> events = mapState.contains(eventTimestamp) ? mapState.get(eventTimestamp) : new ArrayList<>();
events.add(event);
mapState.put(eventTimestamp, events);
timerService.registerEventTimeTimer(eventTimestamp);
}
}
@Override
public void onTimer(long timestamp,
KeyedProcessFunction<String, Pojo, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
Long watermark = ctx.timerService().currentWatermark();
List<Pojo> events = mapState.get(timestamp);
for (Pojo event : events) {
if (timestamp <= watermark) {
out.collect(event.getValue());
}
}
mapState.remove(timestamp);
}
}
Split a monolithic state in smaller ones whenever possible
Each rich function or keyed function can reference one or multiple states.
In other words, you may specify a ValueState
, a MapState
and a ListState
in the same keyed function or rich function and update them separately.
By doing so, it allows Flink to handle serialization and deserialization of smaller structures rather than dealing with a single large structure.
Example
Let's imagine a state holding three distinct values of type Long:
ListState<Long> listState;
This unique state could be replaced by three distinct states:
ValueState<Long> state1;
ValueState<Long> state2;
ValueState<Long> state3;
Conclusion
To achieve good performance for serialization and deserialization of objects exchanged between several Flink operators:
- Define a POJO that holds data you compute once and convey across all Flink operators.
- Use a String field if you really need to hold complex structured data:
- Avoid creating custom serializers that you will have to maintain that complex structure and its potential evolution (fields added, removed or with different types).
- Avoid compressing data with custom code, it should have adverse effect with states likes RocksDB that may perform compression.
To achieve good performance for state serialization and deserialization:
- Keep the state minimal
- Only store what your application requires to be stateful.
- Remove keys that are no longer necessary.
- Use state time-to-leave (TTL) if applicable to your application.
- Keep the structure of the state as simple as possible
- Use primitive types.
- Use POJO
- Use as many primitive types as possible for the fields.
- Avoid nested structure.
- Avoid maps as it could result in a sparse structure.
- Avoid custom types that cause a fallback to the Kryo library (not efficient for unknown class structure).
- If possible, split a complex state in multiple smaller states within the same keyed process function or rich function.