chia7712 commented on code in PR #16064: URL: https://github.com/apache/kafka/pull/16064#discussion_r1615204798
########## tools/tools-api/src/main/java/org/apache/kafka/tools/api/Decoder.java: ########## @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +@FunctionalInterface +public interface Decoder<T> { Review Comment: Please add documents for this public APIs ########## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ########## @@ -598,6 +598,25 @@ class DumpLogSegmentsTest { ) } + @Test + def testNewDecoder(): Unit = { + // Decoder translate should pass without exception + DumpLogSegments.newDecoder(classOf[DumpLogSegmentsTest.TestDecoder].getName) Review Comment: Could you add UT to make sure the impl of deprecated decoded must have constructor with `VerifiableProperties` ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -651,4 +651,36 @@ object DumpLogSegments { def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) } + + /* + * The kafka.serializer.Decoder is deprecated in 3.8.0. This method is used to transfer the deprecated + * decoder to the new org.apache.kafka.tools.api.Decoder. Old decoders have an input VerifiableProperties. + * Remove it in new interface since it's always empty. + */ + private[tools] def newDecoder(className: String): Decoder[_] = { + try { + CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](convertDeprecatedDecoderClass(className)) + } catch { + case _: Exception => + val decoder = CoreUtils.createObject[kafka.serializer.Decoder[_]](className, new VerifiableProperties()) Review Comment: Please add comments to say "we always pass empty VerifiableProperties since this tool can't take custom configs ... bababa" ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -651,4 +651,36 @@ object DumpLogSegments { def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) } + + /* + * The kafka.serializer.Decoder is deprecated in 3.8.0. This method is used to transfer the deprecated + * decoder to the new org.apache.kafka.tools.api.Decoder. Old decoders have an input VerifiableProperties. + * Remove it in new interface since it's always empty. + */ + private[tools] def newDecoder(className: String): Decoder[_] = { + try { + CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](convertDeprecatedDecoderClass(className)) + } catch { + case _: Exception => + val decoder = CoreUtils.createObject[kafka.serializer.Decoder[_]](className, new VerifiableProperties()) + (bytes: Array[Byte]) => decoder.fromBytes(bytes) + } + } + + /* + * Covert deprecated decoder implementation to new decoder class. + */ + private[tools] def convertDeprecatedDecoderClass(className: String): String = { + if (className == "kafka.serializer.StringDecoder") { + classOf[StringDecoder].getName Review Comment: Please print warning messages to remind users that something is deprecated. ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -604,14 +604,14 @@ object DumpLogSegments { .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) private val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.") - private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement org.apache.kafka.tools.api.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") - private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + .defaultsTo("org.apache.kafka.tools.api.StringDecoder") + private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement org.apache.kafka.tools.api.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") + .defaultsTo("org.apache.kafka.tools.api.StringDecoder") Review Comment: `classOf[StringDecoder].getName` ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -604,14 +604,14 @@ object DumpLogSegments { .ofType(classOf[java.lang.Integer]) .defaultsTo(Integer.MAX_VALUE) private val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration. Automatically set if print-data-log is enabled.") - private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + private val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement org.apache.kafka.tools.api.Decoder trait. Custom jar should be available in kafka/libs directory.") .withOptionalArg() .ofType(classOf[java.lang.String]) - .defaultsTo("kafka.serializer.StringDecoder") - private val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.") + .defaultsTo("org.apache.kafka.tools.api.StringDecoder") Review Comment: `classOf[StringDecoder].getName` ########## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ########## @@ -732,3 +751,9 @@ class DumpLogSegmentsTest { } } } + +object DumpLogSegmentsTest { + class TestDecoder(props: VerifiableProperties = null) extends kafka.serializer.Decoder[Array[Byte]] { Review Comment: Do we need the default value `null` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
