[ 
https://issues.apache.org/jira/browse/KAFKA-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529034#comment-16529034
 ] 

ASF GitHub Bot commented on KAFKA-6293:
---------------------------------------

ethiebaut closed pull request #4282: KAFKA-6293 Support for Avro formatter in 
ConsoleConsumer
URL: https://github.com/apache/kafka/pull/4282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 1223150a7b2..bbfb29f82ba 100644
--- a/build.gradle
+++ b/build.gradle
@@ -38,6 +38,9 @@ allprojects {
 
   repositories {
     mavenCentral()
+    maven {
+      url "http://packages.confluent.io/maven/";
+    }
   }
   
   apply plugin: 'idea'
@@ -553,6 +556,8 @@ project(':core') {
     compile libs.scalaLogging
     compile libs.zkclient
     compile libs.zookeeper
+    compile libs.avroSerializer
+    compile libs.schemaRegistryClient
 
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.bcpkix
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7d2d3710157..bea79672794 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -23,6 +23,10 @@ import java.util.concurrent.CountDownLatch
 import java.util.{Locale, Properties, Random}
 
 import com.typesafe.scalalogging.LazyLogging
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
+import io.confluent.kafka.serializers.KafkaAvroDeserializer
 import joptsimple._
 import kafka.api.OffsetRequest
 import kafka.common.{MessageFormatter, StreamEndException}
@@ -31,6 +35,7 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import kafka.utils.Implicits._
+import org.apache.avro.generic.GenericRecord
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
 import org.apache.kafka.common.errors.{AuthenticationException, 
WakeupException}
 import org.apache.kafka.common.record.TimestampType
@@ -345,6 +350,11 @@ object ConsoleConsumer extends Logging {
       .describedAs("consumer group id")
       .ofType(classOf[String])
 
+    val confluentServerOpt = 
parser.accepts(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG, "The Confluent 
Schema Registry server port:host, e.g. 192.168.0.2:8081.")
+      .withRequiredArg
+      .describedAs("confluent schema registry")
+      .ofType(classOf[String])
+
     if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a 
tool that reads data from Kafka and outputs it to standard output.")
 
@@ -376,6 +386,7 @@ object ConsoleConsumer extends Logging {
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    val confluentServer = options.valueOf(confluentServerOpt)
 
     if (keyDeserializer != null && !keyDeserializer.isEmpty) {
       formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer)
@@ -383,6 +394,9 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
     }
+    if (confluentServer != null && !confluentServer.isEmpty) {
+      formatterArgs.setProperty(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG, 
confluentServer)
+    }
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {
@@ -609,3 +623,29 @@ class ChecksumMessageFormatter extends MessageFormatter {
     output.println(topicStr + "checksum:" + chksum)
   }
 }
+
+object AvroMessageFormatter {
+  def CONFLUENT_SERVER_CONFIG = "confluent-server"
+}
+
+class AvroMessageFormatter extends MessageFormatter {
+
+  private var kafkaAvroDeserializer: KafkaAvroDeserializer = _
+
+  override def init(props: Properties) {
+    val schemaRegistryStr = 
props.getProperty(AvroMessageFormatter.CONFLUENT_SERVER_CONFIG)
+    if (schemaRegistryStr == null) {
+      System.err.println("Missing property: please set '" + 
AvroMessageFormatter.CONFLUENT_SERVER_CONFIG + "' property to <host>:<port>")
+      Exit.exit(1)
+    }
+    val schemaRegistryUrl = "http://"; + schemaRegistryStr
+    val schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 100)
+    kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry)
+  }
+
+  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {
+    val value = kafkaAvroDeserializer.deserialize(null, 
consumerRecord.value()).asInstanceOf[GenericRecord]
+    output.println(value.toString)
+  }
+
+}
\ No newline at end of file
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 24362418b3a..ec30273f635 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -50,6 +50,7 @@ versions += [
   apacheda: "1.0.0",
   apacheds: "2.0.0-M24",
   argparse4j: "0.7.0",
+  avroSerializer: "3.2.0",
   bcpkix: "1.58",
   easymock: "3.5",
   jackson: "2.9.1",
@@ -67,6 +68,7 @@ versions += [
   reflections: "0.9.11",
   rocksDB: "5.7.3",
   scalatest: "3.0.4",
+  schemaRegistryClient: "3.2.0",
   scoverage: "1.3.1",
   slf4j: "1.7.25",
   snappy: "1.1.4",
@@ -87,6 +89,7 @@ libs += [
   apachedsLdifPartition: 
"org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds",
   apachedsMavibotPartition: 
"org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
   apachedsJdbmPartition: 
"org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
+  avroSerializer: 
"io.confluent:kafka-avro-serializer:$versions.avroSerializer",
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: 
"com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
@@ -111,6 +114,7 @@ libs += [
   scala: "org.scala-lang:scala-library:$versions.scala",
   scalaCompiler: "org.scala-lang:scala-compiler:$versions.scala",
   scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest",
+  schemaRegistryClient: 
"io.confluent:kafka-schema-registry-client:$versions.schemaRegistryClient",
   scoveragePlugin: 
"org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage",
   scoverageRuntime: 
"org.scoverage:scalac-scoverage-runtime_$versions.baseScala:$versions.scoverage",
   slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support for Avro formatter in ConsoleConsumer With Confluent Schema Registry
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6293
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6293
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Eric Thiebaut-George
>            Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Add the ability the display Avro payloads when listening for messages in 
> kafka-console-consumer.sh.
> The proposed PR will display Avro payloads (in JSON) when executed with the 
> following parameters:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
> mytopic --confluent-server localhost:8081 --formatter 
> kafka.tools.AvroMessageFormatter
> PR: https://github.com/apache/kafka/pull/4282



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to