[jira] [Resolved] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-11 Thread Mohan Parthasarathy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-8214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohan Parthasarathy resolved KAFKA-8214.

Resolution: Duplicate

> Handling RecordTooLargeException in the main thread
> ---
>
> Key: KAFKA-8214
> URL: https://issues.apache.org/jira/browse/KAFKA-8214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.2
>Reporter: Mohan Parthasarathy
>Priority: Major
>
> How can we handle this exception in the main application ? If this task 
> incurs this exception, then it does not commit the offset and hence it goes 
> in a loop after that. This happens during aggregation process. We already 
> have a limit on the message size of the topic which is 15 MB.
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=2_6, processor=KSTREAM-SOURCE-16, 
> topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
> partition=6, offset=2049
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
>  
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
> sending since an error caught with a previous record (key 
> fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e 
> timestamp 1554238297629) to topic 
> -detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 15728866 bytes when serialized which is larger than the maximum request size 
> you have configured with the max.request.size configuration.  
>     
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
>      
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
>   
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
>   
>    
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)  
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>   
>     at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
>     
>     at 
> 

[jira] [Created] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-10 Thread Mohan Parthasarathy (JIRA)
Mohan Parthasarathy created KAFKA-8214:
--

 Summary: Handling RecordTooLargeException in the main thread
 Key: KAFKA-8214
 URL: https://issues.apache.org/jira/browse/KAFKA-8214
 Project: Kafka
  Issue Type: Bug
 Environment: Kafka version 1.0.2
Reporter: Mohan Parthasarathy


How can we handle this exception in the main application ? If this task incurs 
this exception, then it does not commit the offset and hence it goes in a loop 
after that. This happens during aggregation process. We already have a limit on 
the message size of the topic which is 15 MB.


org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=2_6, processor=KSTREAM-SOURCE-16, 
topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
partition=6, offset=2049
    at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)

   
    at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
   
    at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)

     
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
sending since an error caught with a previous record (key 
fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e 
timestamp 1554238297629) to topic 
-detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 15728866 
bytes when serialized which is larger than the maximum request size you have 
configured with the max.request.size configuration. 
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
  
    at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)  

   
    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)

   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)

    
    at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)

    
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
  
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
  
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
    
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    
 
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)

    
    at