This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3a6b9adc21c [SPARK-46633][SQL] Fix Avro reader to handle zero-length blocks 3a6b9adc21c is described below commit 3a6b9adc21c25b01746cc31f3b75fe061a63204c Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Wed Jan 10 09:47:14 2024 +0900 [SPARK-46633][SQL] Fix Avro reader to handle zero-length blocks ### What changes were proposed in this pull request? This PR fixes a bug in Avro connector with regard to zero-length blocks. If a file contains one of these blocks, the Avro connector may return an incorrect number of records or even an empty DataFrame in some cases. This was due to the way the `hasNextRow` check worked. `hasNext` method in Avro loads the next block so if the block is empty, it would return false and Avro connector will stop reading rows. However, we should continue checking the next block instead until the sync point. ### Why are the changes needed? Fixes a correctness bug in Avro connector. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test and a generated sample file to verify the fix. Without the patch, reading such file would return fewer records or 0 compared to the actual number (depends on the maxPartitionBytes config). ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44635 from sadikovi/SPARK-46633. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/sql/avro/AvroUtils.scala | 9 ++++++--- connector/avro/src/test/resources/empty_blocks.avro | 5 +++++ .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 13 +++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 27a5b918fc9..25e6aec4d84 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -182,16 +182,19 @@ private[sql] object AvroUtils extends Logging { def hasNextRow: Boolean = { while (!completed && currentRow.isEmpty) { - val r = fileReader.hasNext && !fileReader.pastSync(stopPosition) - if (!r) { + if (fileReader.pastSync(stopPosition)) { fileReader.close() completed = true currentRow = None - } else { + } else if (fileReader.hasNext()) { val record = fileReader.next() // the row must be deserialized in hasNextRow, because AvroDeserializer#deserialize // potentially filters rows currentRow = deserializer.deserialize(record).asInstanceOf[Option[InternalRow]] + } else { + // In this case, `fileReader.hasNext()` returns false but we are not past sync point yet. + // This means empty blocks, we need to continue reading the file in case there are non + // empty blocks or we are past sync point. } } currentRow.isDefined diff --git a/connector/avro/src/test/resources/empty_blocks.avro b/connector/avro/src/test/resources/empty_blocks.avro new file mode 100644 index 00000000000..85d96f4af71 --- /dev/null +++ b/connector/avro/src/test/resources/empty_blocks.avro @@ -0,0 +1,5 @@ +Obj�6decoder.shape.version.patch0,decoder.shape.commitidP32d3df6520fbab9f829c638602bfcaf57a36af3a2decoder.shape.fingerprint�18b0f3cb011ab5450a2eb866a48017b178f421c495cfea8e014a892a2e0af6c4$decoder.shape.usid ea88d6ea20b60173cmesg_shape.id118 cmesg_shape.name,testAvroMessageAAAAAAA(cmesg_shape.bytesize21.cmesg_shape.fingerprint�5d0906d6748d14d2a4b65b5a18dc05ba39e2def91236c3d0d08c577afe38b0180decoder.software.version&cmessage-decoder==1_weoiwasd2weroqw_asdmjkjsdf_2p1_gccgggg.file.path�/tmp/aaaaaa/bbbbbbbbbbb/ccccccccc/split-ggggs/5d0f6168_20231107134036/20231107161711.653/tmp_28dycwr-asd-ed-123-234-128-2.gggggggg.file.mode +batch gggg.file.st_ino(-4344181839388196375"gggg.file.st_size169201408$gggg.file.st_mtime.1998-02-15 16:26:43.000_col_15111.222.333.4xyz_idHe4ed94eb-bfbd-458e-85af-cf1a7245a254 hrr_idhrr-5d0f6168ofname20231107134036*gggg.decode_timestamp01998-02-15 16:28:02.264Zgggg.file.path�s3://test-bucket-abcde/aa/bbbb/ccccccccccccc/ddddddddddddddddddd/eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/fffffffffffffffffffffff.gggg.hh"gggg.file.st_size40788641gggg.location�s3://test-bucket-abcde/aa/bbbb/ccccccccccccc/ddddddddddddddddddd/eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/fffffffffffffffffffffff.gggg.hh2gggg.record_end_timestamp01998-02-15 13:45:35.000Z&gggg.xfer_timestamp01998-02-15 16:14:40.339Zgggg_bytes40788641port_idNoneraw>5d0f6168_20231107134 [...] eBytesJcmesg_shape.field.Column03x.math_typevalue02Bcmesg_shape.field.Column04xxx.idx3Fcmesg_shape.field.Column04xxx.ctypeboolVcmesg_shape.field.Column04xxx.struct_format>?\cmesg_shape.field.Column04xxx.meameameameameamvalue0001Ncmesg_shape.field.Column04xxx.math_typevalue004Dcmesg_shape.field.Column05xxxx.idx4Hcmesg_shape.field.Column05xxxx.ctypeUint32_tXcmesg_shape.field.Column05xxxx.struct_format>I^cmesg_shape.field.Column05xxxx.meameameameameam eBytesPcmesg_shape.field.Column05xxxx.math_typevalue02(gggg.first_timestamp01998-02-15 13:40:36.004ZZcmesg_shape.field.Column06xxxxxxxxxxxxxxx.idx5^cmesg_shape.field.Column06xxxxxxxxxxxxxxx.ctype2EControllableElementNamesncmesg_shape.field.Column06xxxxxxxxxxxxxxx.struct_format>Btcmesg_shape.field.Column06xxxxxxxxxxxxxxx.meameameameameamvalue0001fcmesg_shape.field.Column06xxxxxxxxxxxxxxx.math_typeNoneZcmesg_shape.field.Column07xxxxxxxxxxxxxxx.idx6^cmesg_shape.field.Column07xxxxxxxxxx [...] 5�-�� +���������@�f�i+)w�d5�-�cX3u�L &&&j[�<�����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C��ۺߝ,\ �f"Y���bU�122�������̫.�aڳ��H���y��yg�Z�)=] ��A������松'�af +������{�@�f�i+)w�d5�-�c�s~W�J&�EW�~l~�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!~fk��.������a`��*��Mz�vMz�#�?�0�����+=�@�f�i+)w�d5�-�c������L &&&j�n�����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C|��Wk�X8.lG�����1�Xcdd6�1�5�o��2�0�[pd�wf4|oi��w��a���S ��a`X�Ok`ޙu��N�]f����3',E�g��C�T����;��̚�d��ڇi��a4غ�Ϲ�9?έG�ο����� �X�l��,�p�;�=&j��S��wn���z��Sv"Y�����T��c_�XV�C����I� j��M2m�2 ����R".�@�f�i+)w�d5�- �c�t��J&�[�zֶ<iaSJJ�JJ�2L��4�20�20T22�2O�24�J4�J��JN�2� ��4�34 ��341�3�C��Mz�vM��}�C�20���d��ڋ�T���̠Ir���.���, >�;��YK/m��w�|e�u�.3Оu�����쥊=P�l9�8gZ7G��Ä�O�"yg���� ������p��=&j�6P��w�,g�Y��������ض�@�f�i+)w�d5�-�c��a�L &&&j�^?��M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C��U�C;�Y�>"Y��pg:U�122�����,��13�9��%�w��νB=�|�p���#�� �- ���Q��on�c���ʹ�>#���ǃ�T�g��o��1c��r${L��=H{��6��}�mp�ݙ���5�n/N=k��`ly�f�4$kV�XAk`���˂u������Ծ +aϮ3O?Rўe��'��,����y��Q�h���̲�(����?2�@�f�i+)w�d5�-�c�����UL &&&j�f����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C���E�=, ���a`hZAk���&=f�&�3��ȹ{�\��D~<�@�f�i+)w�d5�-�c�uu��UL &&&j۶�im}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!~�sϢ����]א�a`���-5�122����t�T�M��1�0���_��/�7�S�;��������p��=k6S��w.�dy��3S�����y�@�f�i+)w�d5�-�cX��ȺUL && _�~�h}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322������Y��1�0�x�d��ڵ�T���̠Ir���?����p0�߽�7���[DE�ܛ�~��>�P��C�=&j]� ���S ��w&=d9r��E �����y�@�f�i+)w�d5�-�c�����UL&& 3�5�i}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322������L���g `X�l��ZC_5�143h��!�m㺧=, ��O=�䝆�'�R�;=��V�{g�1${LԾ���=P��a��읶GG�"Y3����Գ���]�,| ���a`��*��B�J?S����,�t�G�gʼ='�h��g�3 0t�C��D��&�X��o�kf�#��Ĺ�@�f�i+)w�d5�-�c�3�܋UL&& ��5\n}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322��������3�0�C��D�HO#5�143h��!~d��U�, w�-��dM��+w�gͺӌsv������;�R�X���Ƽ��}f�����y�@�f�i+)w�d5�-�cXp��UL &&&j���<o}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h�� �� !~��m��^������H�00|�j��5FFf��]�^b���}f�����&=�@�f�i+)w�d5�-�c��eZ�j&�G N~h}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!�����^/ G��C�V$k�l��5FFf��]����iI�f�����t�;�@�f�i+)w�d5�- \ No newline at end of file diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 8a563423563..3d481d1d731 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2716,6 +2716,19 @@ abstract class AvroSuite assert(AvroOptions.isValidOption("datetimeRebaseMode")) assert(AvroOptions.isValidOption("enableStableIdentifiersForUnionType")) } + + test("SPARK-46633: read file with empty blocks") { + for (maxPartitionBytes <- Seq(100, 100000, 100000000)) { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> s"$maxPartitionBytes") { + val file = getResourceAvroFilePath("empty_blocks.avro") + val df = spark.read.format("avro").load(file) + val count = df.count() + val records = df.collect() + assert(count == 58) + assert(count == records.length) + } + } + } } class AvroV1Suite extends AvroSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org