Bruce Robbins created SPARK-33314:
-------------------------------------

             Summary: Avro reader drops rows
                 Key: SPARK-33314
                 URL: https://issues.apache.org/jira/browse/SPARK-33314
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Bruce Robbins


Under certain circumstances, the V1 Avro reader drops rows. For example:
{noformat}
scala> val df = spark.range(0, 25).toDF("index")
df: org.apache.spark.sql.DataFrame = [index: bigint]

scala> df.write.mode("overwrite").format("avro").save("index_avro")

scala> val loaded = spark.read.format("avro").load("index_avro")
loaded: org.apache.spark.sql.DataFrame = [index: bigint]

scala> loaded.collect.size
res1: Int = 25

scala> loaded.orderBy("index").collect.size
res2: Int = 17   <== expected 25

scala> 
loaded.orderBy("index").write.mode("overwrite").format("parquet").save("index_as_parquet")

scala> spark.read.parquet("index_as_parquet").count
res4: Long = 17

scala>
{noformat}
SPARK-32346 slightly refactored the AvroFileFormat and 
AvroPartitionReaderFactory to use a new iterator-like trait called 
AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and 
stores the deserialized row for the next call to RowReader#nextRow. 
Unfortunately, sometimes hasNextRow is called twice before nextRow is called, 
resulting in a lost row (see 
[BypassMergeSortShuffleWriter#write|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L132],
 which calls records.hasNext once before calling it again 
[here|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L155]).

RowReader consumes the Avro record in hasNextRow, rather than nextRow, because 
AvroDeserializer#deserialize potentially filters out the record.

Two possible fixes that I thought of:

1) keep state in RowReader such that multiple calls to RowReader#hasNextRow 
with no intervening call to RowReader#nextRow avoids consuming more than 1 Avro 
record. This requires no changes to any code that extends RowReader, just 
RowReader itself.
 2) Move record consumption to RowReader#nextRow (such that RowReader#nextRow 
could potentially return None) and wrap any iterator that extends RowReader 
with a new iterator created by flatMap. This last iterator will filter out the 
Nones and extract rows from the Somes. This requires changes to AvroFileFormat 
and AvroPartitionReaderFactory as well as RowReader.

The first one seems simplest and most straightfoward, and doesn't require 
changes to AvroFileFormat and AvroPartitionReaderFactory, only to 
AvroUtils#RowReader. So I propose this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to