Github user liyintang commented on the pull request:

    https://github.com/apache/spark/pull/11921#issuecomment-201073631
  
    I have verified the issue is caused by serializing MessageAndMetadata.
    
    Previously:
    ```
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
    ```
    In this way, when serializing KafkaRDD, it needs to every object in 
MessageAndMetadata, which includes the slice of the ByteBuffer. This will cause 
the block more than 2 G.
    
    
    The simple workaround is 
    ```
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key(), mmd.message())
    ```
    In this way, it has deep-copy the message into a new byte[] during 
serializing. So the problem is avoid.
    
    
    Based on this finding, I don't think we need to move forward to deep-copy 
message in the FetchResponse. Maybe we can give user a logging or warning to 
avoid serializing MessageAndMetadata directly.
    
    Does it make sense ? If so, I can close this PR and Jira. 
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to