This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a6b9adc21c [SPARK-46633][SQL] Fix Avro reader to handle zero-length 
blocks
3a6b9adc21c is described below

commit 3a6b9adc21c25b01746cc31f3b75fe061a63204c
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Wed Jan 10 09:47:14 2024 +0900

    [SPARK-46633][SQL] Fix Avro reader to handle zero-length blocks
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug in Avro connector with regard to zero-length blocks. If 
a file contains one of these blocks, the Avro connector may return an incorrect 
number of records or even an empty DataFrame in some cases.
    
    This was due to the way the `hasNextRow` check worked. `hasNext` method in 
Avro loads the next block so if the block is empty, it would return false and 
Avro connector will stop reading rows. However, we should continue checking the 
next block instead until the sync point.
    
    ### Why are the changes needed?
    
    Fixes a correctness bug in Avro connector.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a unit test and a generated sample file to verify the fix. Without 
the patch, reading such file would return fewer records or 0 compared to the 
actual number (depends on the maxPartitionBytes config).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44635 from sadikovi/SPARK-46633.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../main/scala/org/apache/spark/sql/avro/AvroUtils.scala    |  9 ++++++---
 connector/avro/src/test/resources/empty_blocks.avro         |  5 +++++
 .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala    | 13 +++++++++++++
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 27a5b918fc9..25e6aec4d84 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -182,16 +182,19 @@ private[sql] object AvroUtils extends Logging {
 
     def hasNextRow: Boolean = {
       while (!completed && currentRow.isEmpty) {
-        val r = fileReader.hasNext && !fileReader.pastSync(stopPosition)
-        if (!r) {
+        if (fileReader.pastSync(stopPosition)) {
           fileReader.close()
           completed = true
           currentRow = None
-        } else {
+        } else if (fileReader.hasNext()) {
           val record = fileReader.next()
           // the row must be deserialized in hasNextRow, because 
AvroDeserializer#deserialize
           // potentially filters rows
           currentRow = 
deserializer.deserialize(record).asInstanceOf[Option[InternalRow]]
+        } else {
+          // In this case, `fileReader.hasNext()` returns false but we are not 
past sync point yet.
+          // This means empty blocks, we need to continue reading the file in 
case there are non
+          // empty blocks or we are past sync point.
         }
       }
       currentRow.isDefined
diff --git a/connector/avro/src/test/resources/empty_blocks.avro 
b/connector/avro/src/test/resources/empty_blocks.avro
new file mode 100644
index 00000000000..85d96f4af71
--- /dev/null
+++ b/connector/avro/src/test/resources/empty_blocks.avro
@@ -0,0 +1,5 @@
+Obj�6decoder.shape.version.patch0,decoder.shape.commitidP32d3df6520fbab9f829c638602bfcaf57a36af3a2decoder.shape.fingerprint�18b0f3cb011ab5450a2eb866a48017b178f421c495cfea8e014a892a2e0af6c4$decoder.shape.usid
 ea88d6ea20b60173cmesg_shape.id118 
cmesg_shape.name,testAvroMessageAAAAAAA(cmesg_shape.bytesize21.cmesg_shape.fingerprint�5d0906d6748d14d2a4b65b5a18dc05ba39e2def91236c3d0d08c577afe38b0180decoder.software.version&cmessage-decoder==1_weoiwasd2weroqw_asdmjkjsdf_2p1_gccgggg.file.path�/tmp/aaaaaa/bbbbbbbbbbb/ccccccccc/split-ggggs/5d0f6168_20231107134036/20231107161711.653/tmp_28dycwr-asd-ed-123-234-128-2.gggggggg.file.mode
+batch 
gggg.file.st_ino(-4344181839388196375"gggg.file.st_size169201408$gggg.file.st_mtime.1998-02-15
 16:26:43.000_col_15111.222.333.4xyz_idHe4ed94eb-bfbd-458e-85af-cf1a7245a254
hrr_idhrr-5d0f6168ofname20231107134036*gggg.decode_timestamp01998-02-15 
16:28:02.264Zgggg.file.path�s3://test-bucket-abcde/aa/bbbb/ccccccccccccc/ddddddddddddddddddd/eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/fffffffffffffffffffffff.gggg.hh"gggg.file.st_size40788641gggg.location�s3://test-bucket-abcde/aa/bbbb/ccccccccccccc/ddddddddddddddddddd/eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/fffffffffffffffffffffff.gggg.hh2gggg.record_end_timestamp01998-02-15
 13:45:35.000Z&gggg.xfer_timestamp01998-02-15 
16:14:40.339Zgggg_bytes40788641port_idNoneraw>5d0f6168_20231107134 [...]
eBytesJcmesg_shape.field.Column03x.math_typevalue02Bcmesg_shape.field.Column04xxx.idx3Fcmesg_shape.field.Column04xxx.ctypeboolVcmesg_shape.field.Column04xxx.struct_format>?\cmesg_shape.field.Column04xxx.meameameameameamvalue0001Ncmesg_shape.field.Column04xxx.math_typevalue004Dcmesg_shape.field.Column05xxxx.idx4Hcmesg_shape.field.Column05xxxx.ctypeUint32_tXcmesg_shape.field.Column05xxxx.struct_format>I^cmesg_shape.field.Column05xxxx.meameameameameam

eBytesPcmesg_shape.field.Column05xxxx.math_typevalue02(gggg.first_timestamp01998-02-15
 
13:40:36.004ZZcmesg_shape.field.Column06xxxxxxxxxxxxxxx.idx5^cmesg_shape.field.Column06xxxxxxxxxxxxxxx.ctype2EControllableElementNamesncmesg_shape.field.Column06xxxxxxxxxxxxxxx.struct_format>Btcmesg_shape.field.Column06xxxxxxxxxxxxxxx.meameameameameamvalue0001fcmesg_shape.field.Column06xxxxxxxxxxxxxxx.math_typeNoneZcmesg_shape.field.Column07xxxxxxxxxxxxxxx.idx6^cmesg_shape.field.Column07xxxxxxxxxx
 [...]
5�-��
+���������@�f�i+)w�d5�-�cX3u�L
&&&j[�<�����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C��ۺߝ,\
�f"Y���bU�122�������̫.�aڳ��׿H���y��yg�Z�)=] ��A������԰松݌'�af
+������{�@�f�i+)w�d5�-�c�s~W�J&�EW�~l~�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!~fk��.������a`��*��Mz�vMz�#�?�0�����+=�@�f�i+)w�d5�-�c������L

&&&j�n�����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C|��Wk�X8.lG�����1�Xcdd6�1�5�o݌��2�0�[pd�wf4|oi��w��a���S
��a`X�Ok`ޙu��N�]f����3',E�g��C�T����;��̚�d��ڇi��a4غ�Ϲ�9?έG�ο�����
�X�l��,�p�;�=&j��S��wn���z�΋�Sv"Y�����T��c_�XV�C����I�     j��M2m�2
����R".�@�f�i+)w�d5�-
�c�t��J&�[�zֶ<iaSJJ�JJ�2L��4�20�20T22�2O�24�J4�J��JN�2�
 ��4�34
 ��341�3�C��Mz�vM��}�C�20���d��ڋ�T���̠Ir���.���,
>�;��YK/m��w�|e�u�.3Оu�����쥊=P�l9�8gZ7G��Ä�O�"yg����
��׼����p��=&j�6P��w�,g�Y��������ض�@�f�i+)w�d5�-�c��a�L
&&&j�^?��M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C��U�C;�Y�>"Y��pg:U�122�����,��13�9��%�w��νB=�|�p���#��
�-
���Q��on�c���ʹ�>#���ǃ�T�g��o��1c��r${L��=H{��6��}�mp�ݙ���5�n/N=k��`ly�f�4$kV�XAk`���˂u������Ծ
+aϮ3O?Rўe��'��,����y��Q�h���̲�(����?2�@�f�i+)w�d5�-�c�����UL
&&&j�f����M))�*)��0���������P����<����*��*��*9E���H���B���T�P�gf�$�C���E�=,
���a`hZAk���&=f�&�3��ȹ{�\��D~<�@�f�i+)w�d5�-�c�uu��UL
&&&j۶�im}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!~�sϢ����]א�a`���-5�122����t�T�M��1�0���_��/�7�S�;��������p��=k6S��w.�dy��3S�����y�@�f�i+)w�d5�-�cX��ȺUL
&&
_�~�h}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322������Y��1�0�x�d��ڵ�T���̠Ir���?����p0�߽�7���[DE�ܛ�~��>�P��C�=&j]�
 ���S
 ��w&=d9r��E �����y�@�f�i+)w�d5�-�c�����UL&&
3�5�i}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322������L���g    
`X�l��ZC_5�143h��!�m㺧=,
��O=�䝆�'�R�;=��V�{g�1${LԾ���=P��a��읶GG�"Y3����Գ���]�,|
���a`��*��B�J?S����,�t�G�gʼ='�h��g�3
0t�C��D��&�X��o�kf�#��Ĺ�@�f�i+)w�d5�-�c�3�܋UL&&
��5\n}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��322��������3�0�C��D�HO#5�143h��!~d��U�,
w�-��dM��+w�gͺӌsv������;�R�X���Ƽ��}f�����y�@�f�i+)w�d5�-�cXp��UL
&&&j���<o}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��
 ��
 
!~��m��^������H�00|�j��5FFf��]�^b���}f�����&=�@�f�i+)w�d5�-�c��eZ�j&�G
N~h}�¦��h��de�jeihe``e`�dd`e�lehb�hd�fi��"ehi�ghf�ghb�g(��33h��!�����^/
G��C�V$k�l��5FFf��]����iI�f�����t�;�@�f�i+)w�d5�-
\ No newline at end of file
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 8a563423563..3d481d1d731 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2716,6 +2716,19 @@ abstract class AvroSuite
     assert(AvroOptions.isValidOption("datetimeRebaseMode"))
     assert(AvroOptions.isValidOption("enableStableIdentifiersForUnionType"))
   }
+
+  test("SPARK-46633: read file with empty blocks") {
+    for (maxPartitionBytes <- Seq(100, 100000, 100000000)) {
+      withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> 
s"$maxPartitionBytes") {
+        val file = getResourceAvroFilePath("empty_blocks.avro")
+        val df = spark.read.format("avro").load(file)
+        val count = df.count()
+        val records = df.collect()
+        assert(count == 58)
+        assert(count == records.length)
+      }
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to