[ https://issues.apache.org/jira/browse/KAFKA-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529034#comment-16529034 ]
ASF GitHub Bot commented on KAFKA-6293: --------------------------------------- ethiebaut closed pull request #4282: KAFKA-6293 Support for Avro formatter in ConsoleConsumer URL: https://github.com/apache/kafka/pull/4282 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 1223150a7b2..bbfb29f82ba 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,9 @@ allprojects { repositories { mavenCentral() + maven { + url "http://packages.confluent.io/maven/" + } } apply plugin: 'idea' @@ -553,6 +556,8 @@ project(':core') { compile libs.scalaLogging compile libs.zkclient compile libs.zookeeper + compile libs.avroSerializer + compile libs.schemaRegistryClient testCompile project(':clients').sourceSets.test.output testCompile libs.bcpkix diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 7d2d3710157..bea79672794 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -23,6 +23,10 @@ import java.util.concurrent.CountDownLatch import java.util.{Locale, Properties, Random} import com.typesafe.scalalogging.LazyLogging +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaMetadata +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.KafkaAvroDeserializer import joptsimple._ import kafka.api.OffsetRequest import kafka.common.{MessageFormatter, StreamEndException} @@ -31,6 +35,7 @@ import kafka.message._ import kafka.metrics.KafkaMetricsReporter import kafka.utils._ import kafka.utils.Implicits._ +import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.errors.{AuthenticationException, WakeupException} import org.apache.kafka.common.record.TimestampType @@ -345,6 +350,11 @@ object ConsoleConsumer extends Logging { .describedAs("consumer group id") .ofType(classOf[String]) + val confluentServerOpt = parser.accepts(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG, "The Confluent Schema Registry server port:host, e.g. 192.168.0.2:8081.") + .withRequiredArg + .describedAs("confluent schema registry") + .ofType(classOf[String]) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -376,6 +386,7 @@ object ConsoleConsumer extends Logging { val valueDeserializer = options.valueOf(valueDeserializerOpt) val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + val confluentServer = options.valueOf(confluentServerOpt) if (keyDeserializer != null && !keyDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) @@ -383,6 +394,9 @@ object ConsoleConsumer extends Logging { if (valueDeserializer != null && !valueDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) } + if (confluentServer != null && !confluentServer.isEmpty) { + formatterArgs.setProperty(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG, confluentServer) + } formatter.init(formatterArgs) if (useOldConsumer) { @@ -609,3 +623,29 @@ class ChecksumMessageFormatter extends MessageFormatter { output.println(topicStr + "checksum:" + chksum) } } + +object AvroMessageFormatter { + def CONFLUENT_SERVER_CONFIG = "confluent-server" +} + +class AvroMessageFormatter extends MessageFormatter { + + private var kafkaAvroDeserializer: KafkaAvroDeserializer = _ + + override def init(props: Properties) { + val schemaRegistryStr = props.getProperty(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG) + if (schemaRegistryStr == null) { + System.err.println("Missing property: please set '" + AvroMessageFormatter.CONFLUENT_SERVER_CONFIG + "' property to <host>:<port>") + Exit.exit(1) + } + val schemaRegistryUrl = "http://" + schemaRegistryStr + val schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 100) + kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry) + } + + def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { + val value = kafkaAvroDeserializer.deserialize(null, consumerRecord.value()).asInstanceOf[GenericRecord] + output.println(value.toString) + } + +} \ No newline at end of file diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 24362418b3a..ec30273f635 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -50,6 +50,7 @@ versions += [ apacheda: "1.0.0", apacheds: "2.0.0-M24", argparse4j: "0.7.0", + avroSerializer: "3.2.0", bcpkix: "1.58", easymock: "3.5", jackson: "2.9.1", @@ -67,6 +68,7 @@ versions += [ reflections: "0.9.11", rocksDB: "5.7.3", scalatest: "3.0.4", + schemaRegistryClient: "3.2.0", scoverage: "1.3.1", slf4j: "1.7.25", snappy: "1.1.4", @@ -87,6 +89,7 @@ libs += [ apachedsLdifPartition: "org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds", apachedsMavibotPartition: "org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds", apachedsJdbmPartition: "org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds", + avroSerializer: "io.confluent:kafka-avro-serializer:$versions.avroSerializer", bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", easymock: "org.easymock:easymock:$versions.easymock", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", @@ -111,6 +114,7 @@ libs += [ scala: "org.scala-lang:scala-library:$versions.scala", scalaCompiler: "org.scala-lang:scala-compiler:$versions.scala", scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest", + schemaRegistryClient: "io.confluent:kafka-schema-registry-client:$versions.schemaRegistryClient", scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage", scoverageRuntime: "org.scoverage:scalac-scoverage-runtime_$versions.baseScala:$versions.scoverage", slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j", ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support for Avro formatter in ConsoleConsumer With Confluent Schema Registry > ---------------------------------------------------------------------------- > > Key: KAFKA-6293 > URL: https://issues.apache.org/jira/browse/KAFKA-6293 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Eric Thiebaut-George > Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > Add the ability the display Avro payloads when listening for messages in > kafka-console-consumer.sh. > The proposed PR will display Avro payloads (in JSON) when executed with the > following parameters: > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > mytopic --confluent-server localhost:8081 --formatter > kafka.tools.AvroMessageFormatter > PR: https://github.com/apache/kafka/pull/4282 -- This message was sent by Atlassian JIRA (v7.6.3#76005)