[ 
https://issues.apache.org/jira/browse/KAFKA-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622835#comment-16622835
 ] 

ASF GitHub Bot commented on KAFKA-6923:
---------------------------------------

hachikuji closed pull request #5494: [KAFKA-6923] Refactoring 
Serializer/Deserializer
URL: https://github.com/apache/kafka/pull/5494
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 36a3314e698..c9aca105632 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,7 +62,6 @@
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -89,7 +88,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyList;
-import static 
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -112,8 +110,8 @@
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
-    private final ExtendedDeserializer<K> keyDeserializer;
-    private final ExtendedDeserializer<V> valueDeserializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
     private final IsolationLevel isolationLevel;
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
     private final AtomicReference<RuntimeException> cachedListOffsetsException 
= new AtomicReference<>();
@@ -150,8 +148,8 @@ public Fetcher(LogContext logContext,
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
-        this.keyDeserializer = ensureExtended(keyDeserializer);
-        this.valueDeserializer = ensureExtended(valueDeserializer);
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
         this.retryBackoffMs = retryBackoffMs;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2a35b30f259..b3f76eee49f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -73,7 +73,6 @@
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -81,7 +80,6 @@
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
-import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
 
 /**
  * A Kafka client that publishes records to the Kafka cluster.
@@ -250,8 +248,8 @@
     private final CompressionType compressionType;
     private final Sensor errors;
     private final Time time;
-    private final ExtendedSerializer<K> keySerializer;
-    private final ExtendedSerializer<V> valueSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
@@ -361,20 +359,20 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
             this.partitioner = 
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
Partitioner.class);
             long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
-                this.keySerializer = 
ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                                                                               
          Serializer.class));
+                this.keySerializer = 
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                                                               
          Serializer.class);
                 this.keySerializer.configure(config.originals(), true);
             } else {
                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-                this.keySerializer = ensureExtended(keySerializer);
+                this.keySerializer = keySerializer;
             }
             if (valueSerializer == null) {
-                this.valueSerializer = 
ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                                                                               
            Serializer.class));
+                this.valueSerializer = 
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                                                               
            Serializer.class);
                 this.valueSerializer.configure(config.originals(), false);
             } else {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-                this.valueSerializer = ensureExtended(valueSerializer);
+                this.valueSerializer = valueSerializer;
             }
 
             // load interceptors and make sure they get clientId
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index ab036a0992c..a38bd04f906 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -28,7 +28,6 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.ArrayDeque;
@@ -41,8 +40,6 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
-
 /**
  * A mock of the producer interface you can use for testing code that uses 
Kafka.
  * <p>
@@ -59,8 +56,8 @@
     private final Map<TopicPartition, Long> offsets;
     private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
consumerGroupOffsets;
     private Map<String, Map<TopicPartition, OffsetAndMetadata>> 
uncommittedConsumerGroupOffsets;
-    private final ExtendedSerializer<K> keySerializer;
-    private final ExtendedSerializer<V> valueSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
     private boolean autoComplete;
     private boolean closed;
     private boolean transactionInitialized;
@@ -93,8 +90,8 @@ public MockProducer(final Cluster cluster,
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.partitioner = partitioner;
-        this.keySerializer = ensureExtended(keySerializer);
-        this.valueSerializer = ensureExtended(valueSerializer);
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
         this.offsets = new HashMap<>();
         this.sent = new ArrayList<>();
         this.uncommittedSends = new ArrayList<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index f9eb398cbee..bc1a714c35e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.io.Closeable;
 import java.util.Map;
 
@@ -45,6 +47,17 @@
      */
     T deserialize(String topic, byte[] data);
 
+    /**
+     * Deserialize a record value from a byte array into a value or object.
+     * @param topic topic associated with the data
+     * @param headers headers associated with the record; may be empty.
+     * @param data serialized bytes; may be null; implementations are 
recommended to handle null by returning a value or null rather than throwing an 
exception.
+     * @return deserialized typed data; may be null
+     */
+    default T deserialize(String topic, Headers headers, byte[] data) {
+        return deserialize(topic, data);
+    }
+
     @Override
     void close();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
index 2d5be4a148d..2f4a012fc8f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
@@ -29,7 +29,9 @@
  *
  * A class that implements this interface is expected to have a constructor 
with no parameters.
  * @param <T>
+ * @deprecated This class has been deprecated and will be removed in a future 
release. Please use {@link Deserializer} instead.
  */
+@Deprecated
 public interface ExtendedDeserializer<T> extends Deserializer<T> {
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
index 14fbb47b1db..8c949807a03 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
@@ -29,7 +29,9 @@
  *
  * A class that implements this interface is expected to have a constructor 
with no parameters.
  * @param <T>
+ * @deprecated This class has been deprecated and will be removed in a future 
release. Please use {@link Serializer} instead.
  */
+@Deprecated
 public interface ExtendedSerializer<T> extends Serializer<T> {
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 96fe86b4bd9..c5d4760d381 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.io.Closeable;
 import java.util.Map;
 
@@ -46,6 +48,18 @@
      */
     byte[] serialize(String topic, T data);
 
+    /**
+     * Convert {@code data} into a byte array.
+     *
+     * @param topic topic associated with data
+     * @param headers headers associated with the record
+     * @param data typed data
+     * @return serialized bytes
+     */
+    default byte[] serialize(String topic, Headers headers, T data) {
+        return serialize(topic, data);
+    }
+
     /**
      * Close this serializer.
      *
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 22fa0a1bfc0..676aafdc57d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -416,15 +417,25 @@ public void testTopicRefreshInMetadata() throws Exception 
{
         Assert.assertTrue("Topic should still exist in metadata", 
metadata.containsTopic(topic));
     }
 
+    @SuppressWarnings("unchecked") // safe as generic parameters won't vary
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testHeadersWithExtendedClasses() throws Exception {
+        doTestHeaders(ExtendedSerializer.class);
+    }
+
+    @SuppressWarnings("unchecked")
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testHeaders() throws Exception {
+        doTestHeaders(Serializer.class);
+    }
+
+    private <T extends Serializer<String>> void doTestHeaders(Class<T> 
serializerClassToMock) throws Exception {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        @SuppressWarnings("unchecked") // it is safe to suppress, since this 
is a mock class
-        ExtendedSerializer<String> keySerializer = 
PowerMock.createNiceMock(ExtendedSerializer.class);
-        @SuppressWarnings("unchecked")
-        ExtendedSerializer<String> valueSerializer = 
PowerMock.createNiceMock(ExtendedSerializer.class);
+        T keySerializer = PowerMock.createNiceMock(serializerClassToMock);
+        T valueSerializer = PowerMock.createNiceMock(serializerClassToMock);
 
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
keySerializer, valueSerializer);
         Metadata metadata = PowerMock.createNiceMock(Metadata.class);
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 983b63cdeeb..2c0c8a4f40a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -66,61 +66,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
-  @Test
-  def testHeadersExtendedSerializerDeserializer() {
-    val numRecords = 1
-    val record = new ProducerRecord(tp.topic, tp.partition, null, 
"key".getBytes, "value".getBytes)
-
-    val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
-
-      var serializer = new ByteArraySerializer()
-
-      override def serialize(topic: String, headers: Headers, data: 
Array[Byte]): Array[Byte] = {
-        headers.add("content-type", "application/octet-stream".getBytes)
-        serializer.serialize(topic, data)
-      }
+  trait SerializerImpl {
+    var serializer = new ByteArraySerializer()
 
-      override def configure(configs: util.Map[String, _], isKey: Boolean): 
Unit = serializer.configure(configs, isKey)
-
-      override def close(): Unit = serializer.close()
-
-      override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
-        fail("method should not be invoked")
-        null
-      }
+    def serialize(topic: String, headers: Headers, data: Array[Byte]): 
Array[Byte] = {
+      headers.add("content-type", "application/octet-stream".getBytes)
+      serializer.serialize(topic, data)
     }
 
+    def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 
serializer.configure(configs, isKey)
 
-    val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
-
-      var deserializer = new ByteArrayDeserializer()
+    def close(): Unit = serializer.close()
 
-      override def deserialize(topic: String, headers: Headers, data: 
Array[Byte]): Array[Byte] = {
-        val header = headers.lastHeader("content-type")
-        assertEquals("application/octet-stream", if (header == null) null else 
new String(header.value()))
-        deserializer.deserialize(topic, data)
-      }
+    def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
+      fail("method should not be invoked")
+      null
+    }
+  }
 
-      override def configure(configs: util.Map[String, _], isKey: Boolean): 
Unit = deserializer.configure(configs, isKey)
+  trait DeserializerImpl {
+    var deserializer = new ByteArrayDeserializer()
 
+    def deserialize(topic: String, headers: Headers, data: Array[Byte]): 
Array[Byte] = {
+      val header = headers.lastHeader("content-type")
+      assertEquals("application/octet-stream", if (header == null) null else 
new String(header.value()))
+      deserializer.deserialize(topic, data)
+    }
 
-      override def close(): Unit = deserializer.close()
+    def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 
deserializer.configure(configs, isKey)
 
-      override def deserialize(topic: String, data: Array[Byte]): Array[Byte] 
= {
-        fail("method should not be invoked")
-        null
-      }
+    def close(): Unit = deserializer.close()
 
+    def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
+      fail("method should not be invoked")
+      null
     }
+  }
+
+  private def testHeadersSerializeDeserialize(serializer: 
Serializer[Array[Byte]], deserializer: Deserializer[Array[Byte]]): Unit = {
+    val numRecords = 1
+    val record = new ProducerRecord(tp.topic, tp.partition, null, 
"key".getBytes, "value".getBytes)
 
     val producer = createProducer(
       keySerializer = new ByteArraySerializer,
-      valueSerializer = extendedSerializer)
+      valueSerializer = serializer)
     producer.send(record)
 
     val consumer = createConsumer(
       keyDeserializer = new ByteArrayDeserializer,
-      valueDeserializer = extendedDeserializer)
+      valueDeserializer = deserializer)
     assertEquals(0, consumer.assignment.size)
     consumer.assign(List(tp).asJava)
     assertEquals(1, consumer.assignment.size)
@@ -131,6 +125,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(numRecords, records.size)
   }
 
+  @Test
+  def testHeadersExtendedSerializerDeserializer(): Unit = {
+    val extendedSerializer = new ExtendedSerializer[Array[Byte]] with 
SerializerImpl
+    val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] with 
DeserializerImpl
+
+    testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
+  }
+
+  @Test
+  def testHeadersSerializerDeserializer(): Unit = {
+    val extendedSerializer = new Serializer[Array[Byte]] with SerializerImpl
+    val extendedDeserializer = new Deserializer[Array[Byte]] with 
DeserializerImpl
+
+    testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
+  }
+
   @Test
   def testMaxPollRecords() {
     val maxPollRecords = 2
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 29180f2ddb0..56193d570ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -18,29 +18,26 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static 
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
-
-public class ChangedDeserializer<T> implements ExtendedDeserializer<Change<T>> 
{
+public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private ExtendedDeserializer<T> inner;
+    private Deserializer<T> inner;
 
     public ChangedDeserializer(final Deserializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
-    public ExtendedDeserializer<T> inner() {
+    public Deserializer<T> inner() {
         return inner;
     }
 
     public void setInner(final Deserializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 83ac8e039ed..8a76badbc0a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -17,23 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
-
-public class ChangedSerializer<T> implements ExtendedSerializer<Change<T>> {
+public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private ExtendedSerializer<T> inner;
+    private Serializer<T> inner;
 
     public ChangedSerializer(final Serializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     public Serializer<T> inner() {
@@ -41,7 +38,7 @@ public ChangedSerializer(final Serializer<T> inner) {
     }
 
     public void setInner(final Serializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4aaba9e3f08..d7627fe9288 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -18,22 +18,19 @@
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.List;
 
-import static 
org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
-
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     private final List<String> topics;
 
     private ProcessorContext context;
-    private ExtendedDeserializer<K> keyDeserializer;
-    private ExtendedDeserializer<V> valDeserializer;
+    private Deserializer<K> keyDeserializer;
+    private Deserializer<V> valDeserializer;
     private final TimestampExtractor timestampExtractor;
 
     public SourceNode(final String name,
@@ -44,8 +41,8 @@ public SourceNode(final String name,
         super(name);
         this.topics = topics;
         this.timestampExtractor = timestampExtractor;
-        this.keyDeserializer = ensureExtended(keyDeserializer);
-        this.valDeserializer = ensureExtended(valDeserializer);
+        this.keyDeserializer = keyDeserializer;
+        this.valDeserializer = valDeserializer;
     }
 
     public SourceNode(final String name,
@@ -71,9 +68,9 @@ public void init(final InternalProcessorContext context) {
 
         // if deserializers are null, get the default ones from the context
         if (this.keyDeserializer == null)
-            this.keyDeserializer = ensureExtended((Deserializer<K>) 
context.keySerde().deserializer());
+            this.keyDeserializer = (Deserializer<K>) 
context.keySerde().deserializer();
         if (this.valDeserializer == null)
-            this.valDeserializer = ensureExtended((Deserializer<V>) 
context.valueSerde().deserializer());
+            this.valDeserializer = (Deserializer<V>) 
context.valueSerde().deserializer();
 
         // if value deserializers are for {@code Change} values, set the inner 
deserializer when necessary
         if (this.valDeserializer instanceof ChangedDeserializer &&


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Consolidate ExtendedSerializer/Serializer and 
> ExtendedDeserializer/Deserializer
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6923
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6923
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Ismael Juma
>            Assignee: Viktor Somogyi
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.1.0
>
>
> The Javadoc of ExtendedDeserializer states:
> {code}
>  * Prefer {@link Deserializer} if access to the headers is not required. Once 
> Kafka drops support for Java 7, the
>  * {@code deserialize()} method introduced by this interface will be added to 
> Deserializer with a default implementation
>  * so that backwards compatibility is maintained. This interface may be 
> deprecated once that happens.
> {code}
> Since we have dropped Java 7 support, we should figure out how to do this. 
> There are compatibility implications, so a KIP is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to