Mikhail Dubrovin created KAFKA-13681:
----------------------------------------
Summary: Event duplicates for partition-stuck kafka-stream
application
Key: KAFKA-13681
URL: https://issues.apache.org/jira/browse/KAFKA-13681
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.8.1
Reporter: Mikhail Dubrovin
Attachments: fail_topology.txt
Hello,
We found the following unpredictable behavior of Kafka streams:
{code:java}
public void buildStreams(final BuilderHelper builder) {
KTable<TableId, TableValue> table = builder.table();
KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable =
workflowTable(builder);
table
.mapValues(value -> mappers.mainDTO(value))
.leftJoin(workflowTable, mappers::joinWorkflows)
.toStream()
.map((key, value) -> KeyValue.pair(
AggregateId.newBuilder().setId(value.getId()).build(),
mappers.aggregateDTO(value)))
.peek((k, v) -> logSinkRecord(v))
.filter((id, dto) -> !isReprocessing)
.to(...);
}
private static KTable<TableId, ArrayList<InternalWorkflowDTO>>
workflowTable(BuilderHelper builderHelper) {
return builderHelper.workflowTable()
.groupBy((id, workflow) -> KeyValue.pair(
TableId.newBuilder().setId(workflow.getTableId()).build(),
mappers.mapWorkflow(workflow)),
Grouped.with(...))
.aggregate(ArrayList::new, (key, value, agg) -> {
agg.add(value);
return agg;
}, (key, value, agg) -> {
agg.remove(value);
return agg;
}, Materialized.with(...));
} {code}
it is a small part of our topology but it shows the error flow.
*Data structure:*
We have two many-partition topics: entity and workflow. Every topic is
represented as KTable.
*Data error that causes application shutdown:*
Our final event(join the entity and workflow ktables) expects a not-null field
in the entity but for some reason, it comes for one event. The whole aggregator
fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ method
We have a health check which restarts the aggregator if it fails.
When incorrect data comes to one partition, the partition processing is stuck
but other partitions are processed.
It causes that at every restart, _workflowTable_ topology repeats .aggregate()
add/remove flows and puts new List into the repartition topic. But offsets are
not moved for processed partitions due to the aggregator's shutdown.
_This behavior generates/sinks a lot of final entity duplicates at every
restart because the flow is successful for data from a not-corrupted partition
but offsets are not moved for them._
And it also causes troubles if @EqualsAndHashCode is defined to use all fields
to compare. At every restart, the topology tries to remove the old value(not
existing after the first run) and adds a new value at the end of the list. The
list grows after each restart(contains the same - new value values).
I also attached the topology description. To visualize:
[https://zz85.github.io/kafka-streams-viz/]
*Current workaround:*
To redefine @EqualsAndHashCode to use entities' ids only.
*Not solved issue:*
Sink events duplication at every restart.
Thank you in advance!
--
This message was sent by Atlassian Jira
(v8.20.1#820001)