Thanks a lot.
I will test next week.
You saved me a lot of time.

Le sam. 23 déc. 2017 à 21:26, Matthias J. Sax <matth...@confluent.io> a
écrit :

> I see what you mean by "lose messages" now. They are not lost but
> subject to caching and overwrites.
>
> You should disable KTable cache for your application. Either globally by
> setting cache size to zero in StreamsConfig, or locally (requires Kafka
> 1.0) via aggregation() parameter `Materialized#withCachingDisabled()`.
>
> See the docs for more details:
> https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
>
>
> -Matthias
>
> On 12/23/17 1:28 AM, Cedric BERTRAND wrote:
> > Thanks for your answer.
> >
> > The aggregation followed with a mapvalue was my first implementation but
> I
> > loose some message in the mapvalue step.
> >
> > When I say :
> >>> But when I am reading the KTable, I have no guarantee to see all
> messages
> >>> with the same key (because of the commit.interval.ms configuration).
> >
> > I mean:
> > If I have 10 messages with the same key going inside the aggregation, I
> > need to see 10 messages with the diff after the aggregation.
> > If the messages are coming fast (same seconds for exemple), I got less
> than
> > 10 messages.
> >
> > Because I am not enable to do the commit by myself, it sound like only
> the
> > last message with a key is seen be the mapvalue step (and my diff
> operation
> > have a wrong result).
> >
> > if I change the commit.interval.ms (30 seconds by default) to 200ms, I
> can
> > see more messages but, I think, I am not sure to see all message.
> >
> > Setting the commit.interval.ms to a very small value might be a solution
> > but what about the performance.
> >
> > Thanks
> >
> > Cedric
> >
> >
> >
> > Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> a
> > écrit :
> >
> >> Hi,
> >>
> >> what I don't understand is what you mean by
> >>
> >>>> But when I am reading the KTable, I have no guarantee to see all
> >> messages
> >>>> with the same key (because of the commit.interval.ms configuration).
> >>
> >> Can you elaborate? I actually think an aggregation should be the correct
> >> operator to use. However, you would need to return a pair with old/new
> >> value and do a consecutive mapValues that compare the diff over both --
> >> ie, it's a two step approach instead of a single aggregation.
> >>
> >> About the Producer solution. This is indeed not recommended. For the
> >> exception itself, it's a know bug and a fix was proposed via KIP-91.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/22/17 5:26 AM, Cedric BERTRAND wrote:
> >>> Hello,
> >>>
> >>> I'm using KafkaStream for some months and I don't succeed to implement
> >> this
> >>> use case :
> >>>
> >>> I need to compare two objects : a new one and the old one.
> >>> At the end, I need to send a message with the diff between the 2
> objets.
> >>>
> >>> My first implementation was to use an aggregate and to return the diff
> in
> >>> the resulting KTable.
> >>> But when I am reading the KTable, I have no guarantee to see all
> messages
> >>> with the same key (because of the commit.interval.ms configuration).
> >>>
> >>> I have tryed another implementation (see the code below).
> >>> I compute the diff in the aggregate method and I send the message in a
> >>> KafkaProducer that is created outside KafkaStream.
> >>> By doing this, my code is working but with heavy load, I got the
> >> following
> >>> error message (many many times) :
> >>>
> >>> org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s)
> >> for
> >>> vision360-events-client-00-12: 38949 ms has passed since batch creation
> >>> plus linger time
> >>>
> >>> The fact that the Producer is created outside the DAG of KafkaStream
> seem
> >>> to be a bad implementation.
> >>>
> >>>
> >>> final Producer<K, V> producer = ....
> >>>
> >>> stream
> >>>     .groupByKey()
> >>>     .aggregate(
> >>>         () -> ...,
> >>>         (key, value, aggregate) -> {
> >>>             ...
> >>>             producer.send(new ProducerRecord<>(topic, key, myValue),
> >>>                 (metadata, exception) -> {
> >>>                     if (exception!= null) {
> >>>                         LOGGER.error(MessageFormatter.format("Failed to
> >>> send event : {}", key).getMessage(), exception);
> >>>                     }
> >>>                 });
> >>>             return myAggregateValue  ;
> >>>         },
> >>>         ...,
> >>>         ....
> >>>     )
> >>>     .mapValues( ...)
> >>>     .to(topic);
> >>>
> >>> Is there a good way to do this in Kafka :
> >>> To do an aggregation that send a message with a diff for all messages
> >> with
> >>> the same key.
> >>>
> >>>
> >>> Thanks
> >>>
> >>> Cedric
> >>>
> >>
> >>
> >
>
>

Reply via email to