I see what you mean by "lose messages" now. They are not lost but subject to caching and overwrites.
You should disable KTable cache for your application. Either globally by setting cache size to zero in StreamsConfig, or locally (requires Kafka 1.0) via aggregation() parameter `Materialized#withCachingDisabled()`. See the docs for more details: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html -Matthias On 12/23/17 1:28 AM, Cedric BERTRAND wrote: > Thanks for your answer. > > The aggregation followed with a mapvalue was my first implementation but I > loose some message in the mapvalue step. > > When I say : >>> But when I am reading the KTable, I have no guarantee to see all messages >>> with the same key (because of the commit.interval.ms configuration). > > I mean: > If I have 10 messages with the same key going inside the aggregation, I > need to see 10 messages with the diff after the aggregation. > If the messages are coming fast (same seconds for exemple), I got less than > 10 messages. > > Because I am not enable to do the commit by myself, it sound like only the > last message with a key is seen be the mapvalue step (and my diff operation > have a wrong result). > > if I change the commit.interval.ms (30 seconds by default) to 200ms, I can > see more messages but, I think, I am not sure to see all message. > > Setting the commit.interval.ms to a very small value might be a solution > but what about the performance. > > Thanks > > Cedric > > > > Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> a > écrit : > >> Hi, >> >> what I don't understand is what you mean by >> >>>> But when I am reading the KTable, I have no guarantee to see all >> messages >>>> with the same key (because of the commit.interval.ms configuration). >> >> Can you elaborate? I actually think an aggregation should be the correct >> operator to use. However, you would need to return a pair with old/new >> value and do a consecutive mapValues that compare the diff over both -- >> ie, it's a two step approach instead of a single aggregation. >> >> About the Producer solution. This is indeed not recommended. For the >> exception itself, it's a know bug and a fix was proposed via KIP-91. >> >> >> -Matthias >> >> On 12/22/17 5:26 AM, Cedric BERTRAND wrote: >>> Hello, >>> >>> I'm using KafkaStream for some months and I don't succeed to implement >> this >>> use case : >>> >>> I need to compare two objects : a new one and the old one. >>> At the end, I need to send a message with the diff between the 2 objets. >>> >>> My first implementation was to use an aggregate and to return the diff in >>> the resulting KTable. >>> But when I am reading the KTable, I have no guarantee to see all messages >>> with the same key (because of the commit.interval.ms configuration). >>> >>> I have tryed another implementation (see the code below). >>> I compute the diff in the aggregate method and I send the message in a >>> KafkaProducer that is created outside KafkaStream. >>> By doing this, my code is working but with heavy load, I got the >> following >>> error message (many many times) : >>> >>> org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) >> for >>> vision360-events-client-00-12: 38949 ms has passed since batch creation >>> plus linger time >>> >>> The fact that the Producer is created outside the DAG of KafkaStream seem >>> to be a bad implementation. >>> >>> >>> final Producer<K, V> producer = .... >>> >>> stream >>> .groupByKey() >>> .aggregate( >>> () -> ..., >>> (key, value, aggregate) -> { >>> ... >>> producer.send(new ProducerRecord<>(topic, key, myValue), >>> (metadata, exception) -> { >>> if (exception!= null) { >>> LOGGER.error(MessageFormatter.format("Failed to >>> send event : {}", key).getMessage(), exception); >>> } >>> }); >>> return myAggregateValue ; >>> }, >>> ..., >>> .... >>> ) >>> .mapValues( ...) >>> .to(topic); >>> >>> Is there a good way to do this in Kafka : >>> To do an aggregation that send a message with a diff for all messages >> with >>> the same key. >>> >>> >>> Thanks >>> >>> Cedric >>> >> >> >
signature.asc
Description: OpenPGP digital signature