cadonna commented on PR #16684: URL: https://github.com/apache/kafka/pull/16684#issuecomment-2249925420
@loicgreffier Thanks for the investigation and the PR! It seems that the issue is when a record is stored and then forwarded independently of the current input record. That might happen in different places. The ones that come to my mind are: - store caches - buffers (`RocksDBTimeOrderedKeyValueBuffer` and `InMemoryTimeOrderedKeyValueChangeBuffer`) For the buffers, I see that the raw record is not serialized to the changelog topic or the store. That means that the raw record will always be null after a failover for the records in the in-memory buffer. With the RocksDB-based buffer the records will never have a raw records attached. When records without the raw records are evicted from those buffers they will cause the `NullPointerException` when an exception during processing happens after the buffer. Have you considered serializing the raw record? Even if you serialize the raw record, to be backward compatible, Streams needs to be able to read record context without the raw record (from previous versions of Streams). Thus, Streams cannot always provide a raw record to the error context and you need to account for that. @mjsax could you double-check my understanding? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
