mjsax commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1614060982


##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -17,21 +17,73 @@
 package org.apache.kafka.common.errors;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.TimestampType;
+
+import java.nio.ByteBuffer;
 
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
+
+    public enum DeserializationExceptionOrigin {
+        KEY,
+        VALUE
+    }
+
+    private final DeserializationExceptionOrigin origin;
     private final TopicPartition partition;
     private final long offset;
+    private final TimestampType timestampType;
+    private final long timestamp;
+    private final ByteBuffer keyBuffer;
+    private final ByteBuffer valueBuffer;
+    private final Headers headers;
 
-    public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+    @Deprecated
+    public RecordDeserializationException(TopicPartition partition,
+                                          long offset,
+                                          String message,
+                                          Throwable cause) {
         super(message, cause);
+        this.origin = DeserializationExceptionOrigin.VALUE;

Review Comment:
   Should we set this to `null`? If the old constructor is used, we just don't 
know?



##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -17,21 +17,73 @@
 package org.apache.kafka.common.errors;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.TimestampType;
+
+import java.nio.ByteBuffer;
 
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
+
+    public enum DeserializationExceptionOrigin {
+        KEY,
+        VALUE
+    }
+
+    private final DeserializationExceptionOrigin origin;
     private final TopicPartition partition;
     private final long offset;
+    private final TimestampType timestampType;
+    private final long timestamp;
+    private final ByteBuffer keyBuffer;
+    private final ByteBuffer valueBuffer;
+    private final Headers headers;
 
-    public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+    @Deprecated
+    public RecordDeserializationException(TopicPartition partition,
+                                          long offset,
+                                          String message,
+                                          Throwable cause) {
         super(message, cause);
+        this.origin = DeserializationExceptionOrigin.VALUE;
         this.partition = partition;
         this.offset = offset;
+        this.timestampType = TimestampType.NO_TIMESTAMP_TYPE;
+        this.timestamp = 0;

Review Comment:
   Should we use `ConsumerRecord.NO_TIMESTAMP` (which is `-1`)?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -311,25 +312,37 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                                             Optional<Integer> leaderEpoch,
                                             TimestampType timestampType,
                                             Record record) {
+        ByteBuffer keyBytes = record.key();
+        ByteBuffer valueBytes = record.value();
+        Headers headers = new RecordHeaders(record.headers());
+        K key;
+        V value;
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-            ByteBuffer valueBytes = record.value();
-            V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-                    timestamp, timestampType,
-                    keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-                    valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-                    key, value, headers, leaderEpoch);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
         } catch (RuntimeException e) {
-            log.error("Deserializers with error: {}", deserializers);

Review Comment:
   Why was this log line dropped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to