Hello, I'm using KafkaStream for some months and I don't succeed to implement this use case :
I need to compare two objects : a new one and the old one. At the end, I need to send a message with the diff between the 2 objets. My first implementation was to use an aggregate and to return the diff in the resulting KTable. But when I am reading the KTable, I have no guarantee to see all messages with the same key (because of the commit.interval.ms configuration). I have tryed another implementation (see the code below). I compute the diff in the aggregate method and I send the message in a KafkaProducer that is created outside KafkaStream. By doing this, my code is working but with heavy load, I got the following error message (many many times) : org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) for vision360-events-client-00-12: 38949 ms has passed since batch creation plus linger time The fact that the Producer is created outside the DAG of KafkaStream seem to be a bad implementation. final Producer<K, V> producer = .... stream .groupByKey() .aggregate( () -> ..., (key, value, aggregate) -> { ... producer.send(new ProducerRecord<>(topic, key, myValue), (metadata, exception) -> { if (exception!= null) { LOGGER.error(MessageFormatter.format("Failed to send event : {}", key).getMessage(), exception); } }); return myAggregateValue ; }, ..., .... ) .mapValues( ...) .to(topic); Is there a good way to do this in Kafka : To do an aggregation that send a message with a diff for all messages with the same key. Thanks Cedric