Thanks for your answer.

The aggregation followed with a mapvalue was my first implementation but I
loose some message in the mapvalue step.

When I say :
>> But when I am reading the KTable, I have no guarantee to see all messages
>> with the same key (because of the commit.interval.ms configuration).

I mean:
If I have 10 messages with the same key going inside the aggregation, I
need to see 10 messages with the diff after the aggregation.
If the messages are coming fast (same seconds for exemple), I got less than
10 messages.

Because I am not enable to do the commit by myself, it sound like only the
last message with a key is seen be the mapvalue step (and my diff operation
have a wrong result).

if I change the commit.interval.ms (30 seconds by default) to 200ms, I can
see more messages but, I think, I am not sure to see all message.

Setting the commit.interval.ms to a very small value might be a solution
but what about the performance.

Thanks

Cedric



Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io> a
écrit :

> Hi,
>
> what I don't understand is what you mean by
>
> >> But when I am reading the KTable, I have no guarantee to see all
> messages
> >> with the same key (because of the commit.interval.ms configuration).
>
> Can you elaborate? I actually think an aggregation should be the correct
> operator to use. However, you would need to return a pair with old/new
> value and do a consecutive mapValues that compare the diff over both --
> ie, it's a two step approach instead of a single aggregation.
>
> About the Producer solution. This is indeed not recommended. For the
> exception itself, it's a know bug and a fix was proposed via KIP-91.
>
>
> -Matthias
>
> On 12/22/17 5:26 AM, Cedric BERTRAND wrote:
> > Hello,
> >
> > I'm using KafkaStream for some months and I don't succeed to implement
> this
> > use case :
> >
> > I need to compare two objects : a new one and the old one.
> > At the end, I need to send a message with the diff between the 2 objets.
> >
> > My first implementation was to use an aggregate and to return the diff in
> > the resulting KTable.
> > But when I am reading the KTable, I have no guarantee to see all messages
> > with the same key (because of the commit.interval.ms configuration).
> >
> > I have tryed another implementation (see the code below).
> > I compute the diff in the aggregate method and I send the message in a
> > KafkaProducer that is created outside KafkaStream.
> > By doing this, my code is working but with heavy load, I got the
> following
> > error message (many many times) :
> >
> > org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s)
> for
> > vision360-events-client-00-12: 38949 ms has passed since batch creation
> > plus linger time
> >
> > The fact that the Producer is created outside the DAG of KafkaStream seem
> > to be a bad implementation.
> >
> >
> > final Producer<K, V> producer = ....
> >
> > stream
> >     .groupByKey()
> >     .aggregate(
> >         () -> ...,
> >         (key, value, aggregate) -> {
> >             ...
> >             producer.send(new ProducerRecord<>(topic, key, myValue),
> >                 (metadata, exception) -> {
> >                     if (exception!= null) {
> >                         LOGGER.error(MessageFormatter.format("Failed to
> > send event : {}", key).getMessage(), exception);
> >                     }
> >                 });
> >             return myAggregateValue  ;
> >         },
> >         ...,
> >         ....
> >     )
> >     .mapValues( ...)
> >     .to(topic);
> >
> > Is there a good way to do this in Kafka :
> > To do an aggregation that send a message with a diff for all messages
> with
> > the same key.
> >
> >
> > Thanks
> >
> > Cedric
> >
>
>

Reply via email to