I had I believe somewhat of a similar problem that Matthias helped me
figure out.

https://lists.apache.org/thread.html/8ff62b50146ed38c2358aa8
35eb08136e7a5743bf5fc33563c10cead@%3Cusers.kafka.apache.org%3E

I am using Kafka streams 0.11.0.1 and what I did is set
https://kafka.apache.org/0102/javadoc/org/apache/kafka/s
treams/StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 .

Hope that helps!
Puneet

On Sat, Dec 23, 2017 at 11:00 AM, Cedric BERTRAND <
bertrandcedric....@gmail.com> wrote:

> Thanks a lot.
> I will test next week.
> You saved me a lot of time.
>
> Le sam. 23 déc. 2017 à 21:26, Matthias J. Sax <matth...@confluent.io> a
> écrit :
>
> > I see what you mean by "lose messages" now. They are not lost but
> > subject to caching and overwrites.
> >
> > You should disable KTable cache for your application. Either globally by
> > setting cache size to zero in StreamsConfig, or locally (requires Kafka
> > 1.0) via aggregation() parameter `Materialized#withCachingDisabled()`.
> >
> > See the docs for more details:
> > https://docs.confluent.io/current/streams/developer-
> guide/memory-mgmt.html
> >
> >
> > -Matthias
> >
> > On 12/23/17 1:28 AM, Cedric BERTRAND wrote:
> > > Thanks for your answer.
> > >
> > > The aggregation followed with a mapvalue was my first implementation
> but
> > I
> > > loose some message in the mapvalue step.
> > >
> > > When I say :
> > >>> But when I am reading the KTable, I have no guarantee to see all
> > messages
> > >>> with the same key (because of the commit.interval.ms configuration).
> > >
> > > I mean:
> > > If I have 10 messages with the same key going inside the aggregation, I
> > > need to see 10 messages with the diff after the aggregation.
> > > If the messages are coming fast (same seconds for exemple), I got less
> > than
> > > 10 messages.
> > >
> > > Because I am not enable to do the commit by myself, it sound like only
> > the
> > > last message with a key is seen be the mapvalue step (and my diff
> > operation
> > > have a wrong result).
> > >
> > > if I change the commit.interval.ms (30 seconds by default) to 200ms, I
> > can
> > > see more messages but, I think, I am not sure to see all message.
> > >
> > > Setting the commit.interval.ms to a very small value might be a
> solution
> > > but what about the performance.
> > >
> > > Thanks
> > >
> > > Cedric
> > >
> > >
> > >
> > > Le ven. 22 déc. 2017 à 21:39, Matthias J. Sax <matth...@confluent.io>
> a
> > > écrit :
> > >
> > >> Hi,
> > >>
> > >> what I don't understand is what you mean by
> > >>
> > >>>> But when I am reading the KTable, I have no guarantee to see all
> > >> messages
> > >>>> with the same key (because of the commit.interval.ms
> configuration).
> > >>
> > >> Can you elaborate? I actually think an aggregation should be the
> correct
> > >> operator to use. However, you would need to return a pair with old/new
> > >> value and do a consecutive mapValues that compare the diff over both
> --
> > >> ie, it's a two step approach instead of a single aggregation.
> > >>
> > >> About the Producer solution. This is indeed not recommended. For the
> > >> exception itself, it's a know bug and a fix was proposed via KIP-91.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 12/22/17 5:26 AM, Cedric BERTRAND wrote:
> > >>> Hello,
> > >>>
> > >>> I'm using KafkaStream for some months and I don't succeed to
> implement
> > >> this
> > >>> use case :
> > >>>
> > >>> I need to compare two objects : a new one and the old one.
> > >>> At the end, I need to send a message with the diff between the 2
> > objets.
> > >>>
> > >>> My first implementation was to use an aggregate and to return the
> diff
> > in
> > >>> the resulting KTable.
> > >>> But when I am reading the KTable, I have no guarantee to see all
> > messages
> > >>> with the same key (because of the commit.interval.ms configuration).
> > >>>
> > >>> I have tryed another implementation (see the code below).
> > >>> I compute the diff in the aggregate method and I send the message in
> a
> > >>> KafkaProducer that is created outside KafkaStream.
> > >>> By doing this, my code is working but with heavy load, I got the
> > >> following
> > >>> error message (many many times) :
> > >>>
> > >>> org.apache.kafka.common.errors.TimeoutException: Expiring 16
> record(s)
> > >> for
> > >>> vision360-events-client-00-12: 38949 ms has passed since batch
> creation
> > >>> plus linger time
> > >>>
> > >>> The fact that the Producer is created outside the DAG of KafkaStream
> > seem
> > >>> to be a bad implementation.
> > >>>
> > >>>
> > >>> final Producer<K, V> producer = ....
> > >>>
> > >>> stream
> > >>>     .groupByKey()
> > >>>     .aggregate(
> > >>>         () -> ...,
> > >>>         (key, value, aggregate) -> {
> > >>>             ...
> > >>>             producer.send(new ProducerRecord<>(topic, key, myValue),
> > >>>                 (metadata, exception) -> {
> > >>>                     if (exception!= null) {
> > >>>                         LOGGER.error(MessageFormatter.format("Failed
> to
> > >>> send event : {}", key).getMessage(), exception);
> > >>>                     }
> > >>>                 });
> > >>>             return myAggregateValue  ;
> > >>>         },
> > >>>         ...,
> > >>>         ....
> > >>>     )
> > >>>     .mapValues( ...)
> > >>>     .to(topic);
> > >>>
> > >>> Is there a good way to do this in Kafka :
> > >>> To do an aggregation that send a message with a diff for all messages
> > >> with
> > >>> the same key.
> > >>>
> > >>>
> > >>> Thanks
> > >>>
> > >>> Cedric
> > >>>
> > >>
> > >>
> > >
> >
> >
>



-- 
Regards,
Puneet

Reply via email to