Hello,
We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer
The code of the process function is listed below:
private transient MapState<String, Object> testMapState;
@Override
public void processElement(Map<String, Object> value, Context ctx,
Collector<Map<String, Object>> out) throws Exception {
if (testMapState.isEmpty()) {
testMapState.putAll(value);
out.collect(value);
testMapState.clear();
}
}
We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms
We had to change and use ValueState instead.
Are we using the MapState in the correct way or are we doing something
wrong ?
Is this behaviour expected because flink recommendations are to use
MapState and NOT ValueState ?
BR,
Nick