I am trying to convert avro records with field type = bytes to json string
using Structured Streaming in Spark 2.1. Please see below.
object AvroConvert {
case class KafkaMessage(
payload: String
)
val schemaString = """{
"type" : "record",
"name" : "HdfsEvent",
"namespace" : "com.abc.def.domain.hdfs",
"fields" : [ {
"name" : "payload",
"type" : {
"type" : "bytes",
"java-class" : "[B"
}
} ]
}"""
val messageSchema = new Schema.Parser().parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](messageSchema)
// Binary decoder
val decoder = DecoderFactory.get()
// Register implicit encoder for map operation
implicit val encoder: Encoder[GenericRecord] =
org.apache.spark.sql.Encoders.kryo[GenericRecord]
def main(args: Array[String]) {
val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";
// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()
// Load streaming data
import session.implicits._
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, decoder.binaryDecoder(d, null))
val payload = rec.get("payload").asInstanceOf[Byte].toString
new KafkaMessage(payload)
})
val query = data.writeStream
.outputMode("Append")
.format("console")
.start()
query.awaitTermination()
}
}
I am getting the below error.
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
at
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I read suggestions to use DataFileReader instead of binaryDecoder as below but
was was not successful using this in scala.
DatumReader<GenericRecord> datumReader = new
SpecificDatumReader<GenericRecord>(schema);
DataFileStream<GenericRecord> dataFileReader = new
DataFileStream<GenericRecord>(inputStream, datumReader);
Once the Byte type "payload" is converted to json, I plan write it back to
another topic of kafka.
Any help on this is much appreciated. Thank you!
Revin