Thanks a lot. I will test next week. You saved me a lot of time. Le sam. 23 déc. 2017 à 21:26, Matthias J. Sax <matth...@confluent.io> a écrit :
> I see what you mean by "lose messages" now. They are not lost but > subject to caching and overwrites. > > You should disable KTable cache for your application. Either globally by > setting cache size to zero in StreamsConfig, or locally (requires Kafka > 1.0) via aggregation() parameter `Materialized#withCachingDisabled()`. > > See the docs for more details: > https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html > > > -Matthias > > On 12/23/17 1:28 AM, Cedric BERTRAND wrote: > > Thanks for your answer. > > > > The aggregation followed with a mapvalue was my first implementation but > I > > loose some message in the mapvalue step. > > > > When I say : > >>> But when I am reading the KTable, I have no guarantee to see all > messages > >>> with the same key (because of the commit.interval.ms configuration). > > > > I mean: > > If I have 10 messages with the same key going inside the aggregation, I > > need to see 10 messages with the diff after the aggregation. > > If the messages are coming fast (same seconds for exemple), I got less > than > > 10 messages. > > > > Because I am not enable to do the commit by myself, it sound like only > the > > last message with a key is seen be the mapvalue step (and my diff > operation > > have a wrong result). > > > > if I change the commit.interval.ms (30 seconds by default) to 200ms, I > can > > see more messages but, I think, I am not sure to see all message. > > > > Setting the commit.interval.ms to a very small value might be a solution > > but what about the performance. > > > > Thanks > > > > Cedric > > > > > > > > Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> a > > écrit : > > > >> Hi, > >> > >> what I don't understand is what you mean by > >> > >>>> But when I am reading the KTable, I have no guarantee to see all > >> messages > >>>> with the same key (because of the commit.interval.ms configuration). > >> > >> Can you elaborate? I actually think an aggregation should be the correct > >> operator to use. However, you would need to return a pair with old/new > >> value and do a consecutive mapValues that compare the diff over both -- > >> ie, it's a two step approach instead of a single aggregation. > >> > >> About the Producer solution. This is indeed not recommended. For the > >> exception itself, it's a know bug and a fix was proposed via KIP-91. > >> > >> > >> -Matthias > >> > >> On 12/22/17 5:26 AM, Cedric BERTRAND wrote: > >>> Hello, > >>> > >>> I'm using KafkaStream for some months and I don't succeed to implement > >> this > >>> use case : > >>> > >>> I need to compare two objects : a new one and the old one. > >>> At the end, I need to send a message with the diff between the 2 > objets. > >>> > >>> My first implementation was to use an aggregate and to return the diff > in > >>> the resulting KTable. > >>> But when I am reading the KTable, I have no guarantee to see all > messages > >>> with the same key (because of the commit.interval.ms configuration). > >>> > >>> I have tryed another implementation (see the code below). > >>> I compute the diff in the aggregate method and I send the message in a > >>> KafkaProducer that is created outside KafkaStream. > >>> By doing this, my code is working but with heavy load, I got the > >> following > >>> error message (many many times) : > >>> > >>> org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) > >> for > >>> vision360-events-client-00-12: 38949 ms has passed since batch creation > >>> plus linger time > >>> > >>> The fact that the Producer is created outside the DAG of KafkaStream > seem > >>> to be a bad implementation. > >>> > >>> > >>> final Producer<K, V> producer = .... > >>> > >>> stream > >>> .groupByKey() > >>> .aggregate( > >>> () -> ..., > >>> (key, value, aggregate) -> { > >>> ... > >>> producer.send(new ProducerRecord<>(topic, key, myValue), > >>> (metadata, exception) -> { > >>> if (exception!= null) { > >>> LOGGER.error(MessageFormatter.format("Failed to > >>> send event : {}", key).getMessage(), exception); > >>> } > >>> }); > >>> return myAggregateValue ; > >>> }, > >>> ..., > >>> .... > >>> ) > >>> .mapValues( ...) > >>> .to(topic); > >>> > >>> Is there a good way to do this in Kafka : > >>> To do an aggregation that send a message with a diff for all messages > >> with > >>> the same key. > >>> > >>> > >>> Thanks > >>> > >>> Cedric > >>> > >> > >> > > > >