Thanks for your answer. The aggregation followed with a mapvalue was my first implementation but I loose some message in the mapvalue step.
When I say : >> But when I am reading the KTable, I have no guarantee to see all messages >> with the same key (because of the commit.interval.ms configuration). I mean: If I have 10 messages with the same key going inside the aggregation, I need to see 10 messages with the diff after the aggregation. If the messages are coming fast (same seconds for exemple), I got less than 10 messages. Because I am not enable to do the commit by myself, it sound like only the last message with a key is seen be the mapvalue step (and my diff operation have a wrong result). if I change the commit.interval.ms (30 seconds by default) to 200ms, I can see more messages but, I think, I am not sure to see all message. Setting the commit.interval.ms to a very small value might be a solution but what about the performance. Thanks Cedric Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> a écrit : > Hi, > > what I don't understand is what you mean by > > >> But when I am reading the KTable, I have no guarantee to see all > messages > >> with the same key (because of the commit.interval.ms configuration). > > Can you elaborate? I actually think an aggregation should be the correct > operator to use. However, you would need to return a pair with old/new > value and do a consecutive mapValues that compare the diff over both -- > ie, it's a two step approach instead of a single aggregation. > > About the Producer solution. This is indeed not recommended. For the > exception itself, it's a know bug and a fix was proposed via KIP-91. > > > -Matthias > > On 12/22/17 5:26 AM, Cedric BERTRAND wrote: > > Hello, > > > > I'm using KafkaStream for some months and I don't succeed to implement > this > > use case : > > > > I need to compare two objects : a new one and the old one. > > At the end, I need to send a message with the diff between the 2 objets. > > > > My first implementation was to use an aggregate and to return the diff in > > the resulting KTable. > > But when I am reading the KTable, I have no guarantee to see all messages > > with the same key (because of the commit.interval.ms configuration). > > > > I have tryed another implementation (see the code below). > > I compute the diff in the aggregate method and I send the message in a > > KafkaProducer that is created outside KafkaStream. > > By doing this, my code is working but with heavy load, I got the > following > > error message (many many times) : > > > > org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) > for > > vision360-events-client-00-12: 38949 ms has passed since batch creation > > plus linger time > > > > The fact that the Producer is created outside the DAG of KafkaStream seem > > to be a bad implementation. > > > > > > final Producer<K, V> producer = .... > > > > stream > > .groupByKey() > > .aggregate( > > () -> ..., > > (key, value, aggregate) -> { > > ... > > producer.send(new ProducerRecord<>(topic, key, myValue), > > (metadata, exception) -> { > > if (exception!= null) { > > LOGGER.error(MessageFormatter.format("Failed to > > send event : {}", key).getMessage(), exception); > > } > > }); > > return myAggregateValue ; > > }, > > ..., > > .... > > ) > > .mapValues( ...) > > .to(topic); > > > > Is there a good way to do this in Kafka : > > To do an aggregation that send a message with a diff for all messages > with > > the same key. > > > > > > Thanks > > > > Cedric > > > >