[jira] [Resolved] (KAFKA-8214) Handling RecordTooLargeException in the main thread
[ https://issues.apache.org/jira/browse/KAFKA-8214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohan Parthasarathy resolved KAFKA-8214. Resolution: Duplicate > Handling RecordTooLargeException in the main thread > --- > > Key: KAFKA-8214 > URL: https://issues.apache.org/jira/browse/KAFKA-8214 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.2 >Reporter: Mohan Parthasarathy >Priority: Major > > How can we handle this exception in the main application ? If this task > incurs this exception, then it does not commit the offset and hence it goes > in a loop after that. This happens during aggregation process. We already > have a limit on the message size of the topic which is 15 MB. > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=2_6, processor=KSTREAM-SOURCE-16, > topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, > partition=6, offset=2049 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367) > > > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104) > > > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413) > > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862) > > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort > sending since an error caught with a previous record (key > fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e > timestamp 1554238297629) to topic > -detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 15728866 bytes when serialized which is larger than the maximum request size > you have configured with the max.request.size configuration. > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133) > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50) > > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192) > > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915) > > > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) > > > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > > > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) > > > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66) > > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31) > > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100) > > at >
[jira] [Created] (KAFKA-8214) Handling RecordTooLargeException in the main thread
Mohan Parthasarathy created KAFKA-8214: -- Summary: Handling RecordTooLargeException in the main thread Key: KAFKA-8214 URL: https://issues.apache.org/jira/browse/KAFKA-8214 Project: Kafka Issue Type: Bug Environment: Kafka version 1.0.2 Reporter: Mohan Parthasarathy How can we handle this exception in the main application ? If this task incurs this exception, then it does not commit the offset and hence it goes in a loop after that. This happens during aggregation process. We already have a limit on the message size of the topic which is 15 MB. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_6, processor=KSTREAM-SOURCE-16, topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, partition=6, offset=2049 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort sending since an error caught with a previous record (key fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e timestamp 1554238297629) to topic -detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 15728866 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at