Hello,

I'm using KafkaStream for some months and I don't succeed to implement this
use case :

I need to compare two objects : a new one and the old one.
At the end, I need to send a message with the diff between the 2 objets.

My first implementation was to use an aggregate and to return the diff in
the resulting KTable.
But when I am reading the KTable, I have no guarantee to see all messages
with the same key (because of the commit.interval.ms configuration).

I have tryed another implementation (see the code below).
I compute the diff in the aggregate method and I send the message in a
KafkaProducer that is created outside KafkaStream.
By doing this, my code is working but with heavy load, I got the following
error message (many many times) :

org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) for
vision360-events-client-00-12: 38949 ms has passed since batch creation
plus linger time

The fact that the Producer is created outside the DAG of KafkaStream seem
to be a bad implementation.


final Producer<K, V> producer = ....

stream
    .groupByKey()
    .aggregate(
        () -> ...,
        (key, value, aggregate) -> {
            ...
            producer.send(new ProducerRecord<>(topic, key, myValue),
                (metadata, exception) -> {
                    if (exception!= null) {
                        LOGGER.error(MessageFormatter.format("Failed to
send event : {}", key).getMessage(), exception);
                    }
                });
            return myAggregateValue  ;
        },
        ...,
        ....
    )
    .mapValues( ...)
    .to(topic);

Is there a good way to do this in Kafka :
To do an aggregation that send a message with a diff for all messages with
the same key.


Thanks

Cedric

Reply via email to