xiarixiaoyao edited a comment on pull request #2721:
URL: https://github.com/apache/hudi/pull/2721#issuecomment-806582989


   test step:
   
   before patch:
   
   step1:
   
   val df = spark.range(0, 1000000).toDF("keyid")
   .withColumn("col3", expr("keyid + 10000000"))
   .withColumn("p", lit(0))
   .withColumn("p1", lit(0))
   .withColumn("p2", lit(7))
   .withColumn("a1", lit(Array[String] ("sb1", "rz")))
   .withColumn("a2", lit(Array[String] ("sb1", "rz")))
   
   // bulk_insert 100w row (keyid from 0 to 1000000)
   
   merge(df, 4, "default", "hive_9b", 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
   
   step2:
   
   val df = spark.range(0, 900000).toDF("keyid")
   .withColumn("col3", expr("keyid + 10000000"))
   .withColumn("p", lit(0))
   .withColumn("p1", lit(0))
   .withColumn("p2", lit(7))
   .withColumn("a1", lit(Array[String] ("sb1", "rz")))
   .withColumn("a2", lit(Array[String] ("sb1", "rz")))
   
   // delete 90w row (keyid from 0 to 900000)
   
   delete(df, 4, "default", "hive_9b")
   
   step3:
   
   query on beeline/spark-sql :  select count(col3)  from hive_9b_rt
   2021-03-25 15:33:29,029 | INFO  | main | RECORDS_OUT_OPERATOR_RS_3:1, 
RECORDS_OUT_INTERMEDIATE:1,  | Operator.java:10382021-03-25 15:33:29,029 | INFO 
 | main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1,  | 
Operator.java:10382021-03-25 15:33:29,029 | ERROR | main | Error running child 
: java.lang.StackOverflowError at 
org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) at 
org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:39)
 at 
org.apache.parquet.column.impl.ColumnReaderBase$2$6.read(ColumnReaderBase.java:344)
 at 
org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:503)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30)
 at 
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:409)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
 at
  
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
 at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:159)
 at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:41)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:84)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at org.apache.hudi.hadoop.realtime.Real
 timeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at org.apache.hudi.hadoop.realtime.RealtimeCom
 pactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
 at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
   
   After patch:
        select count(col3) from hive_9b_rt
        +---------+
        |   _c0   |
        +---------+
        | 100000  |
        +---------+
   step4: 
       val df = spark.range(900000, 1000000).toDF("keyid")
         .withColumn("col3", expr("keyid + 10000000"))
         .withColumn("p", lit(0))
         .withColumn("p1", lit(0))
         .withColumn("p2", lit(7))
         .withColumn("a1", lit(Array[String] ("sb1", "rz")))
         .withColumn("a2", lit(Array[String] ("sb1", "rz")))
   // delete remaining 10W lines
   delete(df, 4, "default", "hive_9b")
   sparksql/hive-beeline:
   select count(col3) from hive_9b_rt;
   +------+
   | _c0  |
   +------+
   | 0    |
   +------+
   
   delete function:
     def delete(df: DataFrame, par: Int, db: String, tableName: String, 
tableType: String = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL): Unit = {
       df.write.format("hudi").
         option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
         option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
         option(PRECOMBINE_FIELD_OPT_KEY, "col3").
         option(RECORDKEY_FIELD_OPT_KEY, "keyid").
         option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
         option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[ComplexKeyGenerator].getName).
         option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete").
         option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
         option("hoodie.insert.shuffle.parallelism", par.toString).
         option("hoodie.upsert.shuffle.parallelism", par.toString).
         option("hoodie.delete.shuffle.parallelism", par.toString).
         option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
         option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"p,p1,p2").
         option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
         option("hoodie.datasource.hive_sync.support_timestamp", "true").
         option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
         option(HIVE_USE_JDBC_OPT_KEY, "false").
         option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
         option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
         option(TABLE_NAME, 
tableName).mode(Append).save(s"/tmp/${db}/${tableName}")
     }
   
   merge function:
   def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String, 
tableName: String,
   tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
   hivePartitionExtract: String = 
"org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"): 
Unit = {
   val mode = if (op.equals("bulk_insert")) {
   Overwrite
   } else {
   Append
   }
   df.write.format("hudi").
   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
   option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
   option(PRECOMBINE_FIELD_OPT_KEY, "col3").
   option(RECORDKEY_FIELD_OPT_KEY, "keyid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
   option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
   option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, 
classOf[ComplexKeyGenerator].getName).
   option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
   option("hoodie.metadata.enable", "false").
   option("hoodie.insert.shuffle.parallelism", par.toString).
   option("hoodie.upsert.shuffle.parallelism", par.toString).
   option("hoodie.delete.shuffle.parallelism", par.toString).
   option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
   option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
   option("hoodie.datasource.hive_sync.support_timestamp", "true").
   option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
   option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
   option(HIVE_USE_JDBC_OPT_KEY, "false").
   option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
   option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
   option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/${db}/${tableName}")
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to