I had I believe somewhat of a similar problem that Matthias helped me figure out.
https://lists.apache.org/thread.html/8ff62b50146ed38c2358aa8 35eb08136e7a5743bf5fc33563c10cead@%3Cusers.kafka.apache.org%3E I am using Kafka streams 0.11.0.1 and what I did is set https://kafka.apache.org/0102/javadoc/org/apache/kafka/s treams/StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 . Hope that helps! Puneet On Sat, Dec 23, 2017 at 11:00 AM, Cedric BERTRAND < bertrandcedric....@gmail.com> wrote: > Thanks a lot. > I will test next week. > You saved me a lot of time. > > Le sam. 23 déc. 2017 à 21:26, Matthias J. Sax <matth...@confluent.io> a > écrit : > > > I see what you mean by "lose messages" now. They are not lost but > > subject to caching and overwrites. > > > > You should disable KTable cache for your application. Either globally by > > setting cache size to zero in StreamsConfig, or locally (requires Kafka > > 1.0) via aggregation() parameter `Materialized#withCachingDisabled()`. > > > > See the docs for more details: > > https://docs.confluent.io/current/streams/developer- > guide/memory-mgmt.html > > > > > > -Matthias > > > > On 12/23/17 1:28 AM, Cedric BERTRAND wrote: > > > Thanks for your answer. > > > > > > The aggregation followed with a mapvalue was my first implementation > but > > I > > > loose some message in the mapvalue step. > > > > > > When I say : > > >>> But when I am reading the KTable, I have no guarantee to see all > > messages > > >>> with the same key (because of the commit.interval.ms configuration). > > > > > > I mean: > > > If I have 10 messages with the same key going inside the aggregation, I > > > need to see 10 messages with the diff after the aggregation. > > > If the messages are coming fast (same seconds for exemple), I got less > > than > > > 10 messages. > > > > > > Because I am not enable to do the commit by myself, it sound like only > > the > > > last message with a key is seen be the mapvalue step (and my diff > > operation > > > have a wrong result). > > > > > > if I change the commit.interval.ms (30 seconds by default) to 200ms, I > > can > > > see more messages but, I think, I am not sure to see all message. > > > > > > Setting the commit.interval.ms to a very small value might be a > solution > > > but what about the performance. > > > > > > Thanks > > > > > > Cedric > > > > > > > > > > > > Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> > a > > > écrit : > > > > > >> Hi, > > >> > > >> what I don't understand is what you mean by > > >> > > >>>> But when I am reading the KTable, I have no guarantee to see all > > >> messages > > >>>> with the same key (because of the commit.interval.ms > configuration). > > >> > > >> Can you elaborate? I actually think an aggregation should be the > correct > > >> operator to use. However, you would need to return a pair with old/new > > >> value and do a consecutive mapValues that compare the diff over both > -- > > >> ie, it's a two step approach instead of a single aggregation. > > >> > > >> About the Producer solution. This is indeed not recommended. For the > > >> exception itself, it's a know bug and a fix was proposed via KIP-91. > > >> > > >> > > >> -Matthias > > >> > > >> On 12/22/17 5:26 AM, Cedric BERTRAND wrote: > > >>> Hello, > > >>> > > >>> I'm using KafkaStream for some months and I don't succeed to > implement > > >> this > > >>> use case : > > >>> > > >>> I need to compare two objects : a new one and the old one. > > >>> At the end, I need to send a message with the diff between the 2 > > objets. > > >>> > > >>> My first implementation was to use an aggregate and to return the > diff > > in > > >>> the resulting KTable. > > >>> But when I am reading the KTable, I have no guarantee to see all > > messages > > >>> with the same key (because of the commit.interval.ms configuration). > > >>> > > >>> I have tryed another implementation (see the code below). > > >>> I compute the diff in the aggregate method and I send the message in > a > > >>> KafkaProducer that is created outside KafkaStream. > > >>> By doing this, my code is working but with heavy load, I got the > > >> following > > >>> error message (many many times) : > > >>> > > >>> org.apache.kafka.common.errors.TimeoutException: Expiring 16 > record(s) > > >> for > > >>> vision360-events-client-00-12: 38949 ms has passed since batch > creation > > >>> plus linger time > > >>> > > >>> The fact that the Producer is created outside the DAG of KafkaStream > > seem > > >>> to be a bad implementation. > > >>> > > >>> > > >>> final Producer<K, V> producer = .... > > >>> > > >>> stream > > >>> .groupByKey() > > >>> .aggregate( > > >>> () -> ..., > > >>> (key, value, aggregate) -> { > > >>> ... > > >>> producer.send(new ProducerRecord<>(topic, key, myValue), > > >>> (metadata, exception) -> { > > >>> if (exception!= null) { > > >>> LOGGER.error(MessageFormatter.format("Failed > to > > >>> send event : {}", key).getMessage(), exception); > > >>> } > > >>> }); > > >>> return myAggregateValue ; > > >>> }, > > >>> ..., > > >>> .... > > >>> ) > > >>> .mapValues( ...) > > >>> .to(topic); > > >>> > > >>> Is there a good way to do this in Kafka : > > >>> To do an aggregation that send a message with a diff for all messages > > >> with > > >>> the same key. > > >>> > > >>> > > >>> Thanks > > >>> > > >>> Cedric > > >>> > > >> > > >> > > > > > > > > -- Regards, Puneet