bithw1 opened a new issue #2304: URL: https://github.com/apache/hudi/issues/2304
Hi, I have following simple code that do upsert 100 times(The code is at the end of the question description), and I disable the auto clean during writes. When the writes is done, there are about 100 parquets in the hoodie table folder. After I run `cleans run` in the hudi-cli, there are only 11 parquets file left in the hoodie table folder, that means, old version files has been cleaned up after clean. Then I run the `select count(1) from hudi_hive_read_write_cow_disable_cleaner_1' and the result is still 101, looks like no data is lost after clean.**I would like to make sure whether there is no data lost after cleaning old version files. In my case, I have written 101 letters, and still get back 101 letters after clean, which works as my expectation** Another obervation is: When I run `select `_hoodie_file_name` from hudi_hive_read_write_cow_disable_cleaner_1;` the result shows the parquest file name, but some of them have been deleted after cleaning, not sure whether it is a bug that it shows the file name that is deleted(I think it should show the file name where the data resides), and also this leads to another question, since the corresponding file has been deleted, where does the data in hive query result come from ``` import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object COWDisableCleanerTest { val spark = SparkSession.builder.appName("COWDisableCleanerTest") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse") .enableHiveSupport().getOrCreate() val hudi_table = "hudi_hive_read_write_cow_disable_cleaner_1" val base_path = s"/data/hudi_demo/$hudi_table" def run(df: DataFrame, round: Int) = { val saveMode = round match { case 0 => SaveMode.Overwrite case _ => SaveMode.Append } df.write.format("hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "creation_date") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.41.90.208:10000") .option(HoodieWriteConfig.TABLE_NAME, hudi_table) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt") .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false") //在100次commit过程中不做clean .option(HoodieCompactionConfig.AUTO_CLEAN_PROP, "false") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .mode(saveMode) .save(base_path); } def main(args: Array[String]): Unit = { import spark.implicits._ val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00", "2020-11-19") //Create table and insert 1 row run(spark.createDataset(Seq(order)).toDF(), 1) //Run 100 times and insert 100 rows ,one row per commit (1 to 100).foreach { i => val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" + i, "2020-11-19") val insertData = spark.createDataset(Seq(order)).toDF() run(insertData, i) } } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org