konradwudkowski opened a new issue #2265:
URL: https://github.com/apache/hudi/issues/2265


   Writing a dataframe with an array column when an array contains a `null` 
causes hudi to write broken parquet.
   
   **To Reproduce**
   
   Steps to reproduce (using pyspark here):
   
   1. Create a spark dataframe where one column is an array, e.g.:
   
   ```
   spark_df = spark.createDataFrame([
       (1, '2020/10/29', ['RG215NY', 'LA12JE']),
       (2, '2020/10/29', []),
       (3, '2020/10/29', None),
       (4, '2020/10/29', [None]), # this row will break things as would 
[None,None] or [None, 'ABC']
       (5, '2020/10/29', ['ABC123']), 
   ], ['hudi_key', 'hudi_partition', 'postcodes'])
   ```
   
   2. Write it as hudi with spark
   
   ```
   hudi_options = {
       'hoodie.table.type': 'COPY_ON_WRITE',
       'hoodie.table.name': "data",
       'hoodie.datasource.write.recordkey.field': 'hudi_key',
       'hoodie.datasource.write.precombine.field': 'hudi_partition',
       'hoodie.datasource.write.partitionpath.field': 'hudi_partition',
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.upsert.shuffle.parallelism': 200,
       'hoodie.consistency.check.enabled': True
   }
   
   
spark_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(hudi_s3_prefix)
   ```
   
   3. Also write the same df as pure parquet for comparison
   
   ```
   spark_df.write.parquet(parquet_s3_prefix), 'overwrite')
   ```
   
   4. Hudi uses parquet as underlying format so find the parquet file it wrote 
on s3 and save a ref to it, same with pure parquet file
   
   5. Reading the parquet file written with Hudi identified in step 4:
   - with Spark (`spark.read.schema(spark_df.schema).parquet(...)`) misses some 
records (seems to be nondeterministic which ones, could be all of them), e.g. 
returns only 3 out of 5
   - with pyarrow (and fastparquet) it fails with an error complaining about 
the number of values in a column, e.g "ArrowInvalid: Column 7 named postcodes 
expected length 5 but got length 3" - the number tallies with how many values 
Spark saw.
   
   6. Load the data with Hudi 
   
   ```
   
spark.read.format('hudi').schema(spark_df.schema).load(f"{hudi_s3_prefix}/*/*/*")
 # (globbing for partitions for year,month,day in this example)
   ``` 
   
   dataframe returned will be empty :(
   
   7. Parquet file written in point 3 (just parquet, no hudi) works fine
   
   
   **Bonus problems**
   
   Let's modify the dataframe to no longer contain the `[None]` for one of the 
rows
   
   ```
   spark_df = spark.createDataFrame([
       (1, '2020/10/29', ['RG215NY', 'LA12JE']),
       (2, '2020/10/29', []),
       (3, '2020/10/29', None),
       #(4, '2020/10/29', [None]),
       (5, '2020/10/29', ['ABC123']),
   ], ['hudi_key', 'hudi_partition', 'postcodes'])
   ```
   
   When we repeat the steps above, save with hudi (overriding) to *the same* 
location we will be able to correctly read using both 
`spark.read.format('hudi').load(...)` and `spark.read.parquet(...)` but 
`pyarrow` will return data with the array column mangled - missing rows, values 
moved around between rows... e.g. 
   
   ```
   hudi_key     hudi_partition  postcodes
   2            2020/10/29          []
   5            2020/10/29          [None]
   1            2020/10/29          [ABC123, RG215NY] # note that ABC123 was a 
postcode from hudi_key=5 which now has no postcode!
   3            2020/10/29          None
   ```
   
   
   **Expected behavior**
   
   It's possible to write a data frame with hudi where one column is an array 
type and some of those arrays have null(s) in them
   
   **Environment Description**
   
   * AWS EMR version: 5.30.1 
   
   * Hudi version : 0.5.2 and 0.6.0
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.6
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3 
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Override vs append
   
   if we started with some valid hudi data and used mode('append') and 
attempted to add some more with array containing nulls the new data will not be 
added but old data will stay and is readable. Later adding "good" (no nulls) 
data will also work. The parquet file for "bad" batch of data will be broken 
and can't be correctly read similarly to what was described above.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to