[
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Dubrovin updated KAFKA-13681:
-------------------------------------
Summary: Sink event duplicates for partition-stuck stream application
(was: Event duplicates for partition-stuck kafka-stream application)
> Sink event duplicates for partition-stuck stream application
> ------------------------------------------------------------
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.8.1
> Reporter: Mikhail Dubrovin
> Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {
> KTable<TableId, TableValue> table = builder.table();
> KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable =
> workflowTable(builder);
> table
> .mapValues(value -> mappers.mainDTO(value))
> .leftJoin(workflowTable, mappers::joinWorkflows)
> .toStream()
> .map((key, value) -> KeyValue.pair(
> AggregateId.newBuilder().setId(value.getId()).build(),
> mappers.aggregateDTO(value)))
> .peek((k, v) -> logSinkRecord(v))
> .filter((id, dto) -> !isReprocessing)
> .to(...);
> }
> private static KTable<TableId, ArrayList<InternalWorkflowDTO>>
> workflowTable(BuilderHelper builderHelper) {
> return builderHelper.workflowTable()
> .groupBy((id, workflow) -> KeyValue.pair(
>
> TableId.newBuilder().setId(workflow.getTableId()).build(),
> mappers.mapWorkflow(workflow)),
> Grouped.with(...))
> .aggregate(ArrayList::new, (key, value, agg) -> {
> agg.add(value);
> return agg;
> }, (key, value, agg) -> {
> agg.remove(value);
> return agg;
> }, Materialized.with(...));
> } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null
> field in the entity but for some reason, it comes for one event. The whole
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_
> method
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats
> .aggregate() add/remove flows and puts new List into the repartition topic.
> But offsets are not moved for processed partitions due to the aggregator's
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every
> restart because the flow is successful for data from a not-corrupted
> partition but offsets are not moved for them._
> And it also causes troubles if @EqualsAndHashCode is defined to use all
> fields to compare. At every restart, the topology tries to remove the old
> value(not existing after the first run) and adds a new value at the end of
> the list. The list grows after each restart(contains the same - new value
> values).
> I also attached the topology description. To visualize:
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!
--
This message was sent by Atlassian Jira
(v8.20.1#820001)