Hello,

I have a Kafka Streams application that is consuming from two topics and
internally aggregating, transforming and joining data. One of the
aggregation steps is adding an id to an ArrayList of ids. Naturally since
there was a lot of data the changelog message became too big and was not
sent to the changelog topic with the following exception:

[ERROR]  (1-producer)
org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
task [2_2] Error sending record (key {"eventId":432897452,"version":1}
value [<byte array>] timestamp 1521832424795) to topic
<application-id>-KSTREAM-AGGREGATE-STATE-STORE-0000000016-changelog
due to {}; No more records will be sent and no more offsets will be
recorded for this task.
org.apache.kafka.common.errors.RecordTooLargeException: The request included
a message larger than the max message size the server will accept.

In this message the key is a nicely formatted JSON as it should be, but the
value is an enormous byte array, instead of JSON. I checked the
corresponding changelog topic and the messages that were logged before that
are JSON strings. Also I am using Serdes for both the key and value class.
My question is why is the key logged as JSON and the value logged as byte
array instead of JSON?

Regards,
Mihaela Stoycheva

Reply via email to