This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 558b3958806 [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr… 558b3958806 is described below commit 558b395880673ec45bf9514c98983e50e21d9398 Author: wangzixuan.wzxuan <wangzixuan.wzx...@bytedance.com> AuthorDate: Sun Jun 26 21:05:08 2022 -0500 [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr… …oDeserializer ### What changes were proposed in this pull request? Add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer. ### Why are the changes needed? - HeapBuffer.get(bytes) puts the data from POS to the end into bytes, and sets POS as the end. The next call will return empty bytes. - The second call of AvroDeserializer will return an InternalRow with empty binary column when avro record has binary column. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add ut in AvroCatalystDataConversionSuite. Closes #36973 from wzx140/avro-fix. Authored-by: wangzixuan.wzxuan <wangzixuan.wzx...@bytedance.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../apache/spark/sql/avro/AvroDeserializer.scala | 2 ++ .../sql/avro/AvroCatalystDataConversionSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5bb51a92977..1192856ae77 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -195,6 +195,8 @@ private[sql] class AvroDeserializer( case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index a43d171fb52..5c0d64b4d55 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -360,4 +360,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite None, new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } + + test("AvroDeserializer with binary type") { + val jsonFormatSchema = + """ + |{ + | "type": "record", + | "name": "record", + | "fields" : [ + | {"name": "a", "type": "bytes"} + | ] + |} + """.stripMargin + val avroSchema = new Schema.Parser().parse(jsonFormatSchema) + val avroRecord = new GenericData.Record(avroSchema) + val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("a", bb) + + val expected = InternalRow(Array[Byte](97, 48, 53)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org