bithw1 opened a new issue #2304:
URL: https://github.com/apache/hudi/issues/2304


   Hi,
   I have following simple code that do upsert 100 times(The code is at the end 
of the question description), and I disable the auto clean during writes. When 
the writes is done, there are about 100 parquets in the hoodie table folder.
   
   After I run `cleans run` in the hudi-cli, there are only 11 parquets file  
left in the hoodie table folder, that means, old version files has been cleaned 
up after clean.
   
   Then I run the `select count(1) from 
hudi_hive_read_write_cow_disable_cleaner_1' and the result is still 101, looks 
like no data is lost after clean.**I would like to make sure whether there is 
no data lost after cleaning old version files. In my case, I have written 101 
letters, and still get back 101 letters after clean, which works as my 
expectation** 
   
   Another obervation is: 
   
   When I run `select `_hoodie_file_name` from 
hudi_hive_read_write_cow_disable_cleaner_1;`  the result shows the parquest 
file name, but some of them have been deleted after cleaning, not sure whether 
it is a bug that it shows the file name that is deleted(I think it should show 
the file name where the data resides), and also this leads to another question, 
since the corresponding file has been deleted, where does the data in hive 
query result come from
   
   
   ```
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
   import org.apache.hudi.index.HoodieIndex
   import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
   
   object COWDisableCleanerTest {
     val spark = SparkSession.builder.appName("COWDisableCleanerTest")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
       .enableHiveSupport().getOrCreate()
   
     val hudi_table = "hudi_hive_read_write_cow_disable_cleaner_1"
   
     val base_path = s"/data/hudi_demo/$hudi_table"
   
     def run(df: DataFrame, round: Int) = {
       val saveMode = round match {
         case 0 => SaveMode.Overwrite
         case _ => SaveMode.Append
       }
   
       df.write.format("hudi")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"creation_date")
         .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
         .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, 
"jdbc:hive2://10.41.90.208:10000")
         .option(HoodieWriteConfig.TABLE_NAME, hudi_table)
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
         .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
   
         //在100次commit过程中不做clean
         .option(HoodieCompactionConfig.AUTO_CLEAN_PROP, "false")
         .option("hoodie.insert.shuffle.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .mode(saveMode)
         .save(base_path);
     }
   
   
     def main(args: Array[String]): Unit = {
       import spark.implicits._
       val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00", 
"2020-11-19")
   
       //Create table and insert 1 row
       run(spark.createDataset(Seq(order)).toDF(), 1)
   
       //Run 100 times and insert 100 rows ,one row per commit
       (1 to 100).foreach {
         i =>
           val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" + 
i, "2020-11-19")
           val insertData = spark.createDataset(Seq(order)).toDF()
           run(insertData, i)
       }
   
     }
   }
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to