[jira] [Updated] (HUDI-7547) Simplification of archival, savepoint, cleaning interplays
[ https://issues.apache.org/jira/browse/HUDI-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7547: -- Description: We are tackling the replace commit, savepoint, cleaner, archival data consistency issues here: https://issues.apache.org/jira/browse/HUDI-7779 was: Related to https://issues.apache.org/jira/browse/HUDI-7779 > Simplification of archival, savepoint, cleaning interplays > -- > > Key: HUDI-7547 > URL: https://issues.apache.org/jira/browse/HUDI-7547 > Project: Apache Hudi > Issue Type: Improvement > Components: archiving, cleaning, savepoint, table-service >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > Fix For: 1.0.0 > > > > We are tackling the replace commit, savepoint, cleaner, archival data > consistency issues here: https://issues.apache.org/jira/browse/HUDI-7779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7547) Simplification of archival, savepoint, cleaning interplays
[ https://issues.apache.org/jira/browse/HUDI-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7547: -- Description: Related to https://issues.apache.org/jira/browse/HUDI-7779 > Simplification of archival, savepoint, cleaning interplays > -- > > Key: HUDI-7547 > URL: https://issues.apache.org/jira/browse/HUDI-7547 > Project: Apache Hudi > Issue Type: Improvement > Components: archiving, cleaning, savepoint, table-service >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > Fix For: 1.0.0 > > > > Related to https://issues.apache.org/jira/browse/HUDI-7779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
(hudi) branch master updated (4224d872719 -> b5913c65147)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from 4224d872719 [MINOR] Update DOAP with 0.15.0 Release (#11409) add b5913c65147 [MINOR] Add logs whenever fallback to fs listing even with MDT enabled (#11407) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java| 1 + .../main/java/org/apache/hudi/metadata/HoodieTableMetadata.java| 7 ++- 2 files changed, 7 insertions(+), 1 deletion(-)
[jira] [Assigned] (HUDI-7824) Fix incremental partitions fetch logic when savepoint is removed for Incr cleaner
[ https://issues.apache.org/jira/browse/HUDI-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7824: - Assignee: sivabalan narayanan > Fix incremental partitions fetch logic when savepoint is removed for Incr > cleaner > - > > Key: HUDI-7824 > URL: https://issues.apache.org/jira/browse/HUDI-7824 > Project: Apache Hudi > Issue Type: Bug > Components: cleaning > Reporter: sivabalan narayanan >Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > with incremental cleaner, if a savepoint is blocking cleaning up of a commit > and cleaner moved ahead wrt earliest commit to retain, when savepoint is > removed later, cleaner should account for cleaning up the commit of interest. > > Lets ensure clean planner account for all partitions when such savepoint > removal is detected -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7824) Fix incremental partitions fetch logic when savepoint is removed for Incr cleaner
sivabalan narayanan created HUDI-7824: - Summary: Fix incremental partitions fetch logic when savepoint is removed for Incr cleaner Key: HUDI-7824 URL: https://issues.apache.org/jira/browse/HUDI-7824 Project: Apache Hudi Issue Type: Bug Components: cleaning Reporter: sivabalan narayanan with incremental cleaner, if a savepoint is blocking cleaning up of a commit and cleaner moved ahead wrt earliest commit to retain, when savepoint is removed later, cleaner should account for cleaning up the commit of interest. Lets ensure clean planner account for all partitions when such savepoint removal is detected -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Commented] (HUDI-7211) Relax need of ordering/precombine field for tables with autogenerated record keys for DeltaStreamer
[ https://issues.apache.org/jira/browse/HUDI-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850517#comment-17850517 ] sivabalan narayanan commented on HUDI-7211: --- For auto record key gen, you need to set operation type to "INSERT". Can you give that a try. > Relax need of ordering/precombine field for tables with autogenerated record > keys for DeltaStreamer > --- > > Key: HUDI-7211 > URL: https://issues.apache.org/jira/browse/HUDI-7211 > Project: Apache Hudi > Issue Type: Bug > Components: writer-core >Reporter: Aditya Goenka >Priority: Critical > Fix For: 1.1.0 > > > [https://github.com/apache/hudi/issues/10233] > > ``` > NOW=$(date '+%Y%m%dt%H%M%S') > ${SPARK_HOME}/bin/spark-submit \ > --jars > ${path_prefix}/jars/${SPARK_V}/hudi-spark${SPARK_VERSION}-bundle_2.12-${HUDI_VERSION}.jar > \ > --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ > ${path_prefix}/jars/${SPARK_V}/hudi-utilities-slim-bundle_2.12-${HUDI_VERSION}.jar > \ > --target-base-path ${path_prefix}/testcases/stocks/data/target/${NOW} \ > --target-table stocks${NOW} \ > --table-type COPY_ON_WRITE \ > --base-file-format PARQUET \ > --props ${path_prefix}/testcases/stocks/configs/hoodie.properties \ > --source-class org.apache.hudi.utilities.sources.JsonDFSSource \ > --schemaprovider-class > org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ > --hoodie-conf > hoodie.deltastreamer.schemaprovider.source.schema.file=${path_prefix}/testcases/stocks/data/schema_without_ts.avsc > \ > --hoodie-conf > hoodie.deltastreamer.schemaprovider.target.schema.file=${path_prefix}/testcases/stocks/data/schema_without_ts.avsc > \ > --op UPSERT \ > --spark-master yarn \ > --hoodie-conf > hoodie.deltastreamer.source.dfs.root=${path_prefix}/testcases/stocks/data/source_without_ts > \ > --hoodie-conf hoodie.datasource.write.partitionpath.field=date \ > --hoodie-conf hoodie.datasource.write.keygenerator.type=SIMPLE \ > --hoodie-conf hoodie.datasource.write.hive_style_partitioning=false \ > --hoodie-conf hoodie.metadata.enable=true > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation
[ https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7812: - Assignee: sivabalan narayanan > Async Clustering w/ row writer fails due to timetravel query validation > > > Key: HUDI-7812 > URL: https://issues.apache.org/jira/browse/HUDI-7812 > Project: Apache Hudi > Issue Type: Bug > Components: clustering >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > With clustering row writer enabled flow, we trigger a time travel query to > read input records. But the query side fails if there are any pending commits > (due to new ingestion ) whose timestamp < clustering instant time. we need to > relax this constraint. > > {code:java} > Failed to execute CLUSTERING service > java.util.concurrent.CompletionException: > org.apache.hudi.exception.HoodieTimeTravelException: Time travel's timestamp > '20240406123837295' must be earlier than the first incomplete commit > timestamp '20240406123834233'. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392-internal] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392-internal] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) > ~[?:1.8.0_392-internal] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_392-internal] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_392-internal] > at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392-internal] > Caused by: org.apache.hudi.exception.HoodieTimeTravelException: Time > travel's timestamp '20240406123837295' must be earlier than the first > incomplete commit timestamp '20240406123834233'. > at > org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf(TimelineUtils.java:369) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1(HoodieBaseRelation.scala:416) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1$adapted(HoodieBaseRelation.scala:416) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at scala.Option.foreach(Option.scala:407) > ~[scala-library-2.12.17.jar:?] > at > org.apache.hudi.HoodieBaseRelation.listLatestFileSlices(HoodieBaseRelation.scala:416) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:225) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:68) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:369) > ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323) > ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357) > ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413) > ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356) > ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323) > ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) > ~[spark-catalyst_2.12-3.2.3.jar:3.2.3] > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) > ~[scala-library-2.12.17.jar:?] > at scala.collection.Iterator$$anon$11
[jira] [Updated] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation
[ https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7812: -- Description: With clustering row writer enabled flow, we trigger a time travel query to read input records. But the query side fails if there are any pending commits (due to new ingestion ) whose timestamp < clustering instant time. we need to relax this constraint. {code:java} Failed to execute CLUSTERING service java.util.concurrent.CompletionException: org.apache.hudi.exception.HoodieTimeTravelException: Time travel's timestamp '20240406123837295' must be earlier than the first incomplete commit timestamp '20240406123834233'. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_392-internal] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_392-internal] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_392-internal] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_392-internal] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_392-internal] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392-internal] Caused by: org.apache.hudi.exception.HoodieTimeTravelException: Time travel's timestamp '20240406123837295' must be earlier than the first incomplete commit timestamp '20240406123834233'. at org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf(TimelineUtils.java:369) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1(HoodieBaseRelation.scala:416) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1$adapted(HoodieBaseRelation.scala:416) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.17.jar:?] at org.apache.hudi.HoodieBaseRelation.listLatestFileSlices(HoodieBaseRelation.scala:416) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:225) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:68) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:369) ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) ~[spark-catalyst_2.12-3.2.3.jar:3.2.3] at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.17.jar:?] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.17.jar:?] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.17.jar:?] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) ~[spark-catalyst_2.12-3.2.3.jar:3.2.3] at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67) ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL] at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) ~[spark-catalyst_2.12-3.2.3.jar:3.2.3] at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) ~[scala-library-2.12.17.jar:?] at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) ~[scala-library-2.12.17.jar:?] at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.17.
[jira] [Updated] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation
[ https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7812: -- Description: With clustering row writer enabled flow, we trigger a time travel query to read input records. But the query side fails if there are any pending commits (due to new ingestion ) whose timestamp < clustering instant time. we need to relax this constraint. was: With clustering row writer enabled flow, we trigger a time travel query to read input records. But the query side fails if there are any pending commits (due to new ingestion ) whose timestamp < clustering instant time. we need to relax this constraint. > Async Clustering w/ row writer fails due to timetravel query validation > > > Key: HUDI-7812 > URL: https://issues.apache.org/jira/browse/HUDI-7812 > Project: Apache Hudi > Issue Type: Bug > Components: clustering >Reporter: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > With clustering row writer enabled flow, we trigger a time travel query to > read input records. But the query side fails if there are any pending commits > (due to new ingestion ) whose timestamp < clustering instant time. we need to > relax this constraint. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation
sivabalan narayanan created HUDI-7812: - Summary: Async Clustering w/ row writer fails due to timetravel query validation Key: HUDI-7812 URL: https://issues.apache.org/jira/browse/HUDI-7812 Project: Apache Hudi Issue Type: Bug Components: clustering Reporter: sivabalan narayanan With clustering row writer enabled flow, we trigger a time travel query to read input records. But the query side fails if there are any pending commits (due to new ingestion ) whose timestamp < clustering instant time. we need to relax this constraint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch branch-0.x updated: [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11356)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/branch-0.x by this push: new bd26797c8e6 [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11356) bd26797c8e6 is described below commit bd26797c8e651694b31bdf614f55c9fc6a757f71 Author: Y Ethan Guo AuthorDate: Wed May 29 07:53:40 2024 -0700 [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11356) * [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde * Revert changes in HoodieBaseRelation --- .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java| 7 +++ .../parquet/Spark30LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark31LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark32LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark33LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark34LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark35LegacyHoodieParquetFileFormat.scala | 12 ++-- 7 files changed, 39 insertions(+), 40 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java index 357200f5f0e..269a83bf7ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java @@ -21,9 +21,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hudi.util.JavaScalaConverters; @@ -51,6 +49,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.util.SerializableConfiguration; import javax.annotation.Nonnull; @@ -163,7 +162,7 @@ public class ColumnStatsIndexHelper { .map(StructField::name) .collect(Collectors.toList()); -StorageConfiguration storageConf = HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration()); +SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); int numParallelism = (baseFilesPaths.size() / 3 + 1); String previousJobDescription = sc.getLocalProperty("spark.job.description"); @@ -178,7 +177,7 @@ public class ColumnStatsIndexHelper { Iterable iterable = () -> paths; return StreamSupport.stream(iterable.spliterator(), false) .flatMap(path -> { - HoodieStorage storage = new HoodieHadoopStorage(path, storageConf); + HoodieStorage storage = new HoodieHadoopStorage(path, serializableConfiguration.value()); return utils.readColumnStatsFromMetadata( storage, new StoragePath(path), diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala index bf6e222b763..59fde4af02f 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, Part
(hudi) branch master updated (014f2d789dd -> 4a530837f1b)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from 014f2d789dd [MINOR] Archive operation only releases lock when holds it (#11160) add 4a530837f1b [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11355) No new revisions were added by this update. Summary of changes: .../HoodieFileGroupReaderBasedParquetFileFormat.scala| 12 +++- .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java| 7 +++ .../parquet/Spark30LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark31LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark32LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark33LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark34LegacyHoodieParquetFileFormat.scala | 12 ++-- .../parquet/Spark35LegacyHoodieParquetFileFormat.scala | 12 ++-- 8 files changed, 46 insertions(+), 45 deletions(-)
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Updated] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete
[ https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7655: -- Fix Version/s: 1.0.0 > Support configuration for clean to fail execution if there is at least one > file is marked as a failed delete > > > Key: HUDI-7655 > URL: https://issues.apache.org/jira/browse/HUDI-7655 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan > Assignee: sivabalan narayanan >Priority: Minor > Labels: clean, pull-request-available > Fix For: 1.0.0 > > > When a HUDI clean plan is executed, any targeted file that was not confirmed > as deleted (or non-existing) will be marked as a "failed delete". Although > these failed deletes will be added to `.clean` metadata, if incremental clean > is used then these files might not ever be picked up again as a future clean > plan, unless a "full-scan" clean ends up being scheduled. In addition to > leading to more files unnecessarily taking up storage space for longer, then > can lead to the following dataset consistency issue for COW datasets: > # Insert at C1 creates file group f1 in partition > # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 > # Any reader of partition that calls HUDI API (with or without using MDT) > will recognize that f1 should be ignored, as it has been replaced. This is > since RC2 instant file is in active timeline > # Some completed instants later an incremental clean is scheduled. It moves > the "earliest commit to retain" to an time after instant time RC2, so it > targets f1 for deletion. But during execution of the plan, it fails to delete > f1. > # An archive job eventually is triggered, and archives C1 and RC2. Note that > f1 is still in partition > At this point, any job/query that reads the aforementioned partition directly > from the DFS file system calls (without directly using MDT FILES partition) > will consider both f1 and f2 as valid file groups, since RC2 is no longer in > active timeline. This is a data consistency issue, and will only be resolved > if a "full-scan" clean is triggered and deletes f1. > This specific scenario can be avoided if the user can configure HUDI clean to > fail execution of a clean plan unless all files are confirmed as deleted (or > not existing in DFS already), "blocking" the clean. The next clean attempt > will re-execute this existing plan, since clean plans cannot be "rolled > back". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table
sivabalan narayanan created HUDI-7807: - Summary: spark-sql updates for a pk less table fails w/ partitioned table Key: HUDI-7807 URL: https://issues.apache.org/jira/browse/HUDI-7807 Project: Apache Hudi Issue Type: Bug Components: spark-sql Reporter: sivabalan narayanan quick start fails when trying to UPDATE with spark-sql for a pk less table. {code:java} > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D'; 24/05/28 11:44:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. 24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D'] org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.keygen.SimpleKeyGenerator at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123) at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91) at org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47) at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:
[jira] [Updated] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table
[ https://issues.apache.org/jira/browse/HUDI-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7807: -- Fix Version/s: 0.15.0 1.0.0 > spark-sql updates for a pk less table fails w/ partitioned table > - > > Key: HUDI-7807 > URL: https://issues.apache.org/jira/browse/HUDI-7807 > Project: Apache Hudi > Issue Type: Bug > Components: spark-sql >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Fix For: 0.15.0, 1.0.0 > > > quick start fails when trying to UPDATE with spark-sql for a pk less table. > > {code:java} > > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D'; > 24/05/28 11:44:41 WARN package: Truncated the string representation of a plan > since it was too large. This behavior can be adjusted by setting > 'spark.sql.debug.maxToStringFields'. > 24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET > fare = 25.0 WHERE rider = 'rider-D'] > org.apache.hudi.exception.HoodieException: Unable to instantiate class > org.apache.hudi.keygen.SimpleKeyGenerator > at > org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) > at > org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123) > at > org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91) > at > org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47) > at > org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218) > at > org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232) > at > org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) > at > org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168) > at > org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) > at > org.apache.spark.sql.exe
[jira] [Assigned] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table
[ https://issues.apache.org/jira/browse/HUDI-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7807: - Assignee: sivabalan narayanan > spark-sql updates for a pk less table fails w/ partitioned table > - > > Key: HUDI-7807 > URL: https://issues.apache.org/jira/browse/HUDI-7807 > Project: Apache Hudi > Issue Type: Bug > Components: spark-sql >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > quick start fails when trying to UPDATE with spark-sql for a pk less table. > > {code:java} > > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D'; > 24/05/28 11:44:41 WARN package: Truncated the string representation of a plan > since it was too large. This behavior can be adjusted by setting > 'spark.sql.debug.maxToStringFields'. > 24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET > fare = 25.0 WHERE rider = 'rider-D'] > org.apache.hudi.exception.HoodieException: Unable to instantiate class > org.apache.hudi.keygen.SimpleKeyGenerator > at > org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) > at > org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123) > at > org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91) > at > org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47) > at > org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218) > at > org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232) > at > org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) > at > org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168) > at > org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) > at > org.apache.spark.sql.execution.QueryExecution.eagerlyE
[jira] [Created] (HUDI-7800) Remove usages of instant time with HoodieRecordLocation
sivabalan narayanan created HUDI-7800: - Summary: Remove usages of instant time with HoodieRecordLocation Key: HUDI-7800 URL: https://issues.apache.org/jira/browse/HUDI-7800 Project: Apache Hudi Issue Type: Improvement Components: index Reporter: sivabalan narayanan HoodieRecordLocation has a reference to instant time. Strictly speaking, partitionpath and fileId is what matters and instant time should not matter. It is used in other places like hbase to account for rollbacks. Atleast equals() in HoodieRecordLocation can ignore accounting for instant time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: Bump h2 in /hudi-platform-service/hudi-metaserver (#9147)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new a0d60d98976 Bump h2 in /hudi-platform-service/hudi-metaserver (#9147) a0d60d98976 is described below commit a0d60d98976d53958e1e04352d2b3b714c395e9c Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> AuthorDate: Fri May 24 18:23:14 2024 -0700 Bump h2 in /hudi-platform-service/hudi-metaserver (#9147) Bumps [h2](https://github.com/h2database/h2database) from 1.4.200 to 2.2.220. - [Release notes](https://github.com/h2database/h2database/releases) - [Commits](https://github.com/h2database/h2database/compare/version-1.4.200...version-2.2.220) --- updated-dependencies: - dependency-name: com.h2database:h2 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- hudi-platform-service/hudi-metaserver/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-platform-service/hudi-metaserver/pom.xml b/hudi-platform-service/hudi-metaserver/pom.xml index 38b25533fe5..b42a229fbd0 100644 --- a/hudi-platform-service/hudi-metaserver/pom.xml +++ b/hudi-platform-service/hudi-metaserver/pom.xml @@ -32,7 +32,7 @@ ${project.parent.basedir} -1.4.200 +2.2.220 /usr/local docker
(hudi) branch master updated (660c6c7a5b0 -> 70a04c6c782)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from 660c6c7a5b0 [MINOR] Fix the look up warning msg (#11286) add 70a04c6c782 Bump h2 in /packaging/hudi-metaserver-server-bundle (#9148) No new revisions were added by this update. Summary of changes: packaging/hudi-metaserver-server-bundle/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Assigned] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7779: - Assignee: sivabalan narayanan > Guarding archival to not archive unintended commits > --- > > Key: HUDI-7779 > URL: https://issues.apache.org/jira/browse/HUDI-7779 > Project: Apache Hudi > Issue Type: Bug > Components: archiving >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > Archiving commits from active timeline could lead to data consistency issues > on some rarest of occasions. We should come up with proper guards to ensure > we do not make such unintended archival. > > Major gap which we wanted to guard is: > if someone disabled cleaner, archival should account for data consistency > issues and ensure it bails out. > We have a base guarding condition, where archival will stop at the earliest > commit to retain based on latest clean commit metadata. But there are few > other scenarios that needs to be accounted for. > > a. Keeping aside replace commits, lets dive into specifics for regular > commits and delta commits. > Say user configured clean commits to 4 and archival configs to 5 and 6. after > t10, cleaner is supposed to clean up all file versions created at or before > t6. Say cleaner did not run(for whatever reason for next 5 commits). > Archival will certainly be guarded until earliest commit to retain based > on latest clean commits. > Corner case to consider: > A savepoint was added to say t3 and later removed. and still the cleaner was > never re-enabled. Even though archival would have been stopped at t3 (until > savepoint is present),but once savepoint is removed, if archival is executed, > it could archive commit t3. Which means, file versions tracked at t3 is still > not yet cleaned by the cleaner. > Reasoning: > We are good here wrt data consistency. Up until cleaner runs next time, this > older file versions might be exposed to the end-user. But time travel query > is not intended for already cleaned up commits and hence this is not an > issue. None of snapshot, time travel query or incremental query will run into > issues as they are not supposed to poll for t3. > At any later point, if cleaner is re-enabled, it will take care of cleaning > up file versions tracked at t3 commit. Just that for interim period, some > older file versions might still be exposed to readers. > > b. The more tricky part is when replace commits are involved. Since replace > commit metadata in active timeline is what ensures the replaced file groups > are ignored for reads, before archiving the same, cleaner is expected to > clean them up fully. But are there chances that this could go wrong? > Corner case to consider. Lets add onto above scenario, where t3 has a > savepoint, and t4 is a replace commit which replaced file groups tracked in > t3. > Cleaner will skip cleaning up files tracked by t3(due to the presence of > savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain > will be pointing to t6. And say savepoint for t3 is removed, but cleaner was > disabled. In this state of the timeline, if archival is executed, (since > t3.savepoint is removed), archival might archive t3 and t4.rc. This could > lead to data duplicates as both replaced file groups and new file groups from > t4.rc would be exposed as valid file groups. > > In other words, if we were to summarize the different scenarios: > i. replaced file group is never cleaned up. > - ECTR(Earliest commit to retain) is less than this.rc and we are good. > ii. replaced file group is cleaned up. > - ECTR is > this.rc and is good to archive. > iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full > clean up did not happen. After savepoint is removed, and when archival is > executed, we should avoid archiving the rc of interest. This is the gap we > don't account for as of now. > > We have 3 options to go about to solve this. > Option A: > Let Savepoint deletion flow take care of cleaning up the files its tracking. > cons: > Savepoint's responsibility is not removing any data files. So, from a single > user responsibility rule, this may not be right. Also, this clean up might > need to do what a clean planner might actually be doing. ie. build file > system view, understand if its supposed to be cleaned up already, and then > only clean up the files which are supposed to be cleaned up. For eg, if
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
[jira] [Commented] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849139#comment-17849139 ] sivabalan narayanan commented on HUDI-7779: --- Hey Sagar, I updated the Jira description w/ more details. can you check it out. > Guarding archival to not archive unintended commits > --- > > Key: HUDI-7779 > URL: https://issues.apache.org/jira/browse/HUDI-7779 > Project: Apache Hudi > Issue Type: Bug > Components: archiving >Reporter: sivabalan narayanan >Priority: Major > > Archiving commits from active timeline could lead to data consistency issues > on some rarest of occasions. We should come up with proper guards to ensure > we do not make such unintended archival. > > Major gap which we wanted to guard is: > if someone disabled cleaner, archival should account for data consistency > issues and ensure it bails out. > We have a base guarding condition, where archival will stop at the earliest > commit to retain based on latest clean commit metadata. But there are few > other scenarios that needs to be accounted for. > > a. Keeping aside replace commits, lets dive into specifics for regular > commits and delta commits. > Say user configured clean commits to 4 and archival configs to 5 and 6. after > t10, cleaner is supposed to clean up all file versions created at or before > t6. Say cleaner did not run(for whatever reason for next 5 commits). > Archival will certainly be guarded until earliest commit to retain based > on latest clean commits. > Corner case to consider: > A savepoint was added to say t3 and later removed. and still the cleaner was > never re-enabled. Even though archival would have been stopped at t3 (until > savepoint is present),but once savepoint is removed, if archival is executed, > it could archive commit t3. Which means, file versions tracked at t3 is still > not yet cleaned by the cleaner. > Reasoning: > We are good here wrt data consistency. Up until cleaner runs next time, this > older file versions might be exposed to the end-user. But time travel query > is not intended for already cleaned up commits and hence this is not an > issue. None of snapshot, time travel query or incremental query will run into > issues as they are not supposed to poll for t3. > At any later point, if cleaner is re-enabled, it will take care of cleaning > up file versions tracked at t3 commit. Just that for interim period, some > older file versions might still be exposed to readers. > > b. The more tricky part is when replace commits are involved. Since replace > commit metadata in active timeline is what ensures the replaced file groups > are ignored for reads, before archiving the same, cleaner is expected to > clean them up fully. But are there chances that this could go wrong? > Corner case to consider. Lets add onto above scenario, where t3 has a > savepoint, and t4 is a replace commit which replaced file groups tracked in > t3. > Cleaner will skip cleaning up files tracked by t3(due to the presence of > savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain > will be pointing to t6. And say savepoint for t3 is removed, but cleaner was > disabled. In this state of the timeline, if archival is executed, (since > t3.savepoint is removed), archival might archive t3 and t4.rc. This could > lead to data duplicates as both replaced file groups and new file groups from > t4.rc would be exposed as valid file groups. > > In other words, if we were to summarize the different scenarios: > i. replaced file group is never cleaned up. > - ECTR(Earliest commit to retain) is less than this.rc and we are good. > ii. replaced file group is cleaned up. > - ECTR is > this.rc and is good to archive. > iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full > clean up did not happen. After savepoint is removed, and when archival is > executed, we should avoid archiving the rc of interest. This is the gap we > don't account for as of now. > > We have 3 options to go about to solve this. > Option A: > Let Savepoint deletion flow take care of cleaning up the files its tracking. > cons: > Savepoint's responsibility is not removing any data files. So, from a single > user responsibility rule, this may not be right. Also, this clean up might > need to do what a clean planner might actually be doing. ie. build file > system view, understand if its supposed to be cleaned up already, and then > only clean up the files which are suppo
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for already cleaned up commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. Just that for interim period, some older file versions might still be exposed to readers. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3(due to the presence of savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 3 options to go about to solve this. Option A: Let Savepoint deletion flow take care of cleaning up the files its tracking. cons: Savepoint's responsibility is not removing any data files. So, from a single user responsibility rule, this may not be right. Also, this clean up might need to do what a clean planner might actually be doing. ie. build file system view, understand if its supposed to be cleaned up already, and then only clean up the files which are supposed to be cleaned up. For eg, if a file group has only one file slice, it should not be cleaned up and scenarios like this. Option B: Since archival is the one which might cause data consistency issues, why not archival do the clean up. We need to account for concurrent cleans, failure and retry scenarios etc. Also, we might need to build the file system view and then take a call whether something needs to be cleaned up before archiving something. Cons: Again, the single user responsibility rule might be broken. Would be neat if cleaner takes care of deleting data files and archival only takes care of deleting/archiving timeline files. Option C: Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track another metadata named "EarliestCommitToArchive". Strictly speaking, ear
(hudi) branch branch-0.x updated: [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/branch-0.x by this push: new 2e39b41be07 [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270) 2e39b41be07 is described below commit 2e39b41be07d42c0d41fd2cf765732e592954466 Author: Y Ethan Guo AuthorDate: Wed May 22 15:27:48 2024 -0700 [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270) --- .../apache/spark/HoodieSparkKryoRegistrar.scala| 6 +- .../apache/spark/TestHoodieSparkKryoRegistrar.java | 86 ++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala index a8650e5668a..eba3999ea57 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala @@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord} import org.apache.hudi.common.util.HoodieCommonKryoRegistrar import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.storage.StorageConfiguration +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.serializers.JavaSerializer @@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist // Hadoop's configuration is not a serializable object by itself, and hence // we're relying on [[SerializableConfiguration]] wrapper to work it around. // We cannot remove this entry; otherwise the ordering is changed. -// So we replace it with [[StorageConfiguration]]. -kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer()) +// So we replace it with [[HadoopStorageConfiguration]] for Spark. +kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer()) } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java new file mode 100644 index 000..4dd297a02b6 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark; + +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HoodieSparkKryoRegistrar} + */ +public class TestHoodieSparkKryoRegistrar { + @Test + public void testSerdeHoodieHadoopConfiguration() { +Kryo kryo = newKryo(); + +HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration()); + +// Serialize +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +Output output = new Output(baos); +kryo.writeObject(output, conf); +output.close(); + +// Deserialize +ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); +Input input = new Input(bais); +HadoopStorageConfiguration deserialized = kryo.readObject(input, HadoopStorageConfiguration.class); +input.close(); + +// Verify +assertEquals(getPropsInMap(conf), getPropsInMap(deserialized)); + } + + private Kryo newKryo() {
(hudi) branch master updated: [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11269)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new e1aa1bcb4af [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11269) e1aa1bcb4af is described below commit e1aa1bcb4af5cbcad9e985d0333eb5275e128e5b Author: Y Ethan Guo AuthorDate: Wed May 22 15:27:03 2024 -0700 [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11269) --- .../apache/spark/HoodieSparkKryoRegistrar.scala| 6 +- .../apache/spark/TestHoodieSparkKryoRegistrar.java | 86 ++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala index a8650e5668a..eba3999ea57 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala @@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord} import org.apache.hudi.common.util.HoodieCommonKryoRegistrar import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.storage.StorageConfiguration +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.serializers.JavaSerializer @@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist // Hadoop's configuration is not a serializable object by itself, and hence // we're relying on [[SerializableConfiguration]] wrapper to work it around. // We cannot remove this entry; otherwise the ordering is changed. -// So we replace it with [[StorageConfiguration]]. -kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer()) +// So we replace it with [[HadoopStorageConfiguration]] for Spark. +kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer()) } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java new file mode 100644 index 000..4dd297a02b6 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark; + +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link HoodieSparkKryoRegistrar} + */ +public class TestHoodieSparkKryoRegistrar { + @Test + public void testSerdeHoodieHadoopConfiguration() { +Kryo kryo = newKryo(); + +HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new Configuration()); + +// Serialize +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +Output output = new Output(baos); +kryo.writeObject(output, conf); +output.close(); + +// Deserialize +ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); +Input input = new Input(bais); +HadoopStorageConfiguration deserialized = kryo.readObject(input, HadoopStorageConfiguration.class); +input.close(); + +// Verify +assertEquals(getPropsInMap(conf), getPropsInMap(deserialized)); + } + + private Kryo newKryo() { +
[jira] [Created] (HUDI-7780) Avoid 0 record parquet files
sivabalan narayanan created HUDI-7780: - Summary: Avoid 0 record parquet files Key: HUDI-7780 URL: https://issues.apache.org/jira/browse/HUDI-7780 Project: Apache Hudi Issue Type: Improvement Components: writer-core Reporter: sivabalan narayanan There are occasions where hudi could produce 0 record files. For eg, a. entire set of records deleted in log block and due to small file handling, a new parquet is created with HoodieMergeHandle. b. During compaction, again there are chances that hudi might produce 0 record parquet files. We need to avoid such files if possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits
[ https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7779: -- Description: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for on-clean commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3, but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR(Earliest commit to retain) is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 2 options to go about to solve this. *Option A:* Before archiving any replace commit by the archiver, lets explicitly check that all replaced file groups are fully deleted. Cons: Might need FileSystemView polling which might be costly. *OptionB:* Cleaner also tracks an additional metadata named, "fully cleaned up file groups" at the end of clean planning and in completed clean commit metadata. So, archival instead of polling FileSystemView (which might be costly), it can check for clean commit metadata for the list of file groups and can deduce if all file groups replaced by X.rc is fully deleted. Pros: Since clean planner anyways polls the file system view and has all file group info already, no additional work might be required to deduce "fully cleaned up file groups". Just that it needs to add an additional metadata. was: Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for
[jira] [Created] (HUDI-7779) Guarding archival to not archive unintended commits
sivabalan narayanan created HUDI-7779: - Summary: Guarding archival to not archive unintended commits Key: HUDI-7779 URL: https://issues.apache.org/jira/browse/HUDI-7779 Project: Apache Hudi Issue Type: Bug Components: archiving Reporter: sivabalan narayanan Archiving commits from active timeline could lead to data consistency issues on some rarest of occasions. We should come up with proper guards to ensure we do not make such unintended archival. Major gap which we wanted to guard is: if someone disabled cleaner, archival should account for data consistency issues and ensure it bails out. We have a base guarding condition, where archival will stop at the earliest commit to retain based on latest clean commit metadata. But there are few other scenarios that needs to be accounted for. a. Keeping aside replace commits, lets dive into specifics for regular commits and delta commits. Say user configured clean commits to 4 and archival configs to 5 and 6. after t10, cleaner is supposed to clean up all file versions created at or before t6. Say cleaner did not run(for whatever reason for next 5 commits). Archival will certainly be guarded until earliest commit to retain based on latest clean commits. Corner case to consider: A savepoint was added to say t3 and later removed. and still the cleaner was never re-enabled. Even though archival would have been stopped at t3 (until savepoint is present),but once savepoint is removed, if archival is executed, it could archive commit t3. Which means, file versions tracked at t3 is still not yet cleaned by the cleaner. Reasoning: We are good here wrt data consistency. Up until cleaner runs next time, this older file versions might be exposed to the end-user. But time travel query is not intended for on-clean commits and hence this is not an issue. None of snapshot, time travel query or incremental query will run into issues as they are not supposed to poll for t3. At any later point, if cleaner is re-enabled, it will take care of cleaning up file versions tracked at t3 commit. b. The more tricky part is when replace commits are involved. Since replace commit metadata in active timeline is what ensures the replaced file groups are ignored for reads, before archiving the same, cleaner is expected to clean them up fully. But are there chances that this could go wrong? Corner case to consider. Lets add onto above scenario, where t3 has a savepoint, and t4 is a replace commit which replaced file groups tracked in t3. Cleaner will skip cleaning up files tracked by t3, but will clean up t4, t5 and t6. So, earliest commit to retain will be pointing to t6. And say savepoint for t3 is removed, but cleaner was disabled. In this state of the timeline, if archival is executed, (since t3.savepoint is removed), archival might archive t3 and t4.rc. This could lead to data duplicates as both replaced file groups and new file groups from t4.rc would be exposed as valid file groups. In other words, if we were to summarize the different scenarios: i. replaced file group is never cleaned up. - ECTR is less than this.rc and we are good. ii. replaced file group is cleaned up. - ECTR is > this.rc and is good to archive. iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full clean up did not happen. After savepoint is removed, and when archival is executed, we should avoid archiving the rc of interest. This is the gap we don't account for as of now. We have 2 options to go about to solve this. *Option A:* Before archiving any replace commit by the archiver, lets explicitly check that all replaced file groups are fully deleted. Cons: Might need FileSystemView polling which might be costly. *OptionB:* Cleaner also tracks an additional metadata named, "fully cleaned up file groups" at the end of clean planning and in completed clean commit metadata. So, archival instead of polling FileSystemView (which might be costly), it can check for clean commit metadata for the list of file groups and can deduce if all file groups replaced by X.rc is fully deleted. Pros: Since clean planner anyways polls the file system view and has all file group info already, no additional work might be required to deduce "fully cleaned up file groups". Just that it needs to add an additional metadata. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7778) Duplicate Key exception with RLI
sivabalan narayanan created HUDI-7778: - Summary: Duplicate Key exception with RLI Key: HUDI-7778 URL: https://issues.apache.org/jira/browse/HUDI-7778 Project: Apache Hudi Issue Type: Bug Components: metadata Reporter: sivabalan narayanan We are occasionally hitting an exception as below meaning, two records are ingested to RLI for the same record key from data table. This is not expected to happen. {code:java} Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y14gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00011.log.3_3-275-476 at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355) ... 28 moreCaused by: org.apache.hudi.exception.HoodieException: Writing multiple records with same key 1 not supported for org.apache.hudi.common.table.log.block.HoodieHFileDataBlock at org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146) at org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121) at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166) at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467) ... 31 more Driver stacktrace:51301 [main] INFO org.apache.spark.scheduler.DAGScheduler [] - Job 78 failed: collect at HoodieJavaRDD.java:177, took 0.245313 s51303 [main] INFO org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline service !!51303 [main] INFO org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closing Timeline server51303 [main] INFO org.apache.hudi.timeline.service.TimelineService [] - Closing Timeline Service51321 [main] INFO org.apache.hudi.timeline.service.TimelineService [] - Closed Timeline Service51321 [main] INFO org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline server org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 197001012 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:80) at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:47) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156) at org.apache.hudi.functional.TestGlobalIndexEnableUpdatePartitions.testUdpateSubsetOfRecUpdates(TestGlobalIndexEnableUpdatePartitions.java:225) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45
[jira] [Assigned] (HUDI-7778) Duplicate Key exception with RLI
[ https://issues.apache.org/jira/browse/HUDI-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7778: - Assignee: sivabalan narayanan > Duplicate Key exception with RLI > - > > Key: HUDI-7778 > URL: https://issues.apache.org/jira/browse/HUDI-7778 > Project: Apache Hudi > Issue Type: Bug > Components: metadata >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > We are occasionally hitting an exception as below meaning, two records are > ingested to RLI for the same record key from data table. This is not expected > to happen. > > {code:java} > Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while > appending records to > file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y14gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00011.log.3_3-275-476 > at > org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475) > at > org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439) > at > org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355) > ... 28 moreCaused by: org.apache.hudi.exception.HoodieException: > Writing multiple records with same key 1 not supported for > org.apache.hudi.common.table.log.block.HoodieHFileDataBlock at > org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146) > at > org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121) > at > org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166) > at > org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467) > ... 31 more > Driver stacktrace:51301 [main] INFO org.apache.spark.scheduler.DAGScheduler > [] - Job 78 failed: collect at HoodieJavaRDD.java:177, took 0.245313 s51303 > [main] INFO org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline > service !!51303 [main] INFO > org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closing Timeline > server51303 [main] INFO org.apache.hudi.timeline.service.TimelineService [] > - Closing Timeline Service51321 [main] INFO > org.apache.hudi.timeline.service.TimelineService [] - Closed Timeline > Service51321 [main] INFO > org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline > server > org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit > time 197001012 > at > org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:80) >at > org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:47) > at > org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98) > at > org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88) > at > org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156) > at > org.apache.hudi.functional.TestGlobalIndexEnableUpdatePartitions.testUdpateSubsetOfRecUpdates(TestGlobalIndexEnableUpdatePartitions.java:225) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) >at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) >at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) >
[jira] [Assigned] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0
[ https://issues.apache.org/jira/browse/HUDI-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7771: - Assignee: sivabalan narayanan > Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0 > --- > > Key: HUDI-7771 > URL: https://issues.apache.org/jira/browse/HUDI-7771 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as > OverwriteWithLatestAvroPayload for 0.15.10 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0
[ https://issues.apache.org/jira/browse/HUDI-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7771: -- Fix Version/s: 0.15.0 > Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0 > --- > > Key: HUDI-7771 > URL: https://issues.apache.org/jira/browse/HUDI-7771 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Fix For: 0.15.0 > > > We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as > OverwriteWithLatestAvroPayload for 0.15.10 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0
sivabalan narayanan created HUDI-7771: - Summary: Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0 Key: HUDI-7771 URL: https://issues.apache.org/jira/browse/HUDI-7771 Project: Apache Hudi Issue Type: Improvement Components: writer-core Reporter: sivabalan narayanan We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as OverwriteWithLatestAvroPayload for 0.15.10 -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) 01/01: Disabling flaky tests
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch branch-0.x-failing-tests-test-mult-writer-archival in repository https://gitbox.apache.org/repos/asf/hudi.git commit ebea9b7c5152d135610bb35de89cf1d9e1ab1449 Author: sivabalan AuthorDate: Wed May 15 15:58:11 2024 -0700 Disabling flaky tests --- .../src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index e9fccfc7054..dce049ca275 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -63,6 +63,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -683,8 +684,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); } - @ParameterizedTest - @ValueSource(booleans = {false, true}) + @Disabled("HUDI-6386") public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 5, 2, HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,
(hudi) branch branch-0.x-failing-tests-test-mult-writer-archival created (now ebea9b7c515)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch branch-0.x-failing-tests-test-mult-writer-archival in repository https://gitbox.apache.org/repos/asf/hudi.git at ebea9b7c515 Disabling flaky tests This branch includes the following new commits: new ebea9b7c515 Disabling flaky tests The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[jira] [Assigned] (HUDI-7768) Fix failing tests for 0.15.0 release (async compaction and metadata num commits check)
[ https://issues.apache.org/jira/browse/HUDI-7768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7768: - Assignee: sivabalan narayanan > Fix failing tests for 0.15.0 release (async compaction and metadata num > commits check) > -- > > Key: HUDI-7768 > URL: https://issues.apache.org/jira/browse/HUDI-7768 > Project: Apache Hudi > Issue Type: Improvement > Components: tests-ci > Reporter: sivabalan narayanan >Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > > > |[https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=600e7de6-e133-5e69-e615-50ee129b3c08=bbbd7bcc-ae73-56b8-887a-cd2d6deaafc7] > [https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=7601efb9-4019-552e-11ba-eb31b66593b2=d4b4e11d-8e26-50e5-a0d9-bb2d5decfeb9] > org.apache.hudi.exception.HoodieMetadataException: Metadata table's > deltacommits exceeded 3: this is likely caused by a pending instant in the > data table. Resolve the pending instant or adjust > `hoodie.metadata.max.deltacommits.when_pending`, then restart the pipeline. > at > org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.checkNumDeltaCommits([HoodieBackedTableMetadataWriter.java:835|http://hoodiebackedtablemetadatawriter.java:835/]) > at > org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.validateTimelineBeforeSchedulingCompaction([HoodieBackedTableMetadataWriter.java:1367|http://hoodiebackedtablemetadatawriter.java:1367/]) > java.lang.IllegalArgumentException: Following instants have timestamps >= > compactionInstant (002) Instants > :[[004__deltacommit__COMPLETED__20240515123806398]] at > org.apache.hudi.common.util.ValidationUtils.checkArgument([ValidationUtils.java:42|http://validationutils.java:42/]) > at > org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute([ScheduleCompactionActionExecutor.java:108|http://schedulecompactionactionexecutor.java:108/]) > | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7768) Fix failing tests for 0.15.0 release (async compaction and metadata num commits check)
sivabalan narayanan created HUDI-7768: - Summary: Fix failing tests for 0.15.0 release (async compaction and metadata num commits check) Key: HUDI-7768 URL: https://issues.apache.org/jira/browse/HUDI-7768 Project: Apache Hudi Issue Type: Improvement Components: tests-ci Reporter: sivabalan narayanan |[https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=600e7de6-e133-5e69-e615-50ee129b3c08=bbbd7bcc-ae73-56b8-887a-cd2d6deaafc7] [https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=7601efb9-4019-552e-11ba-eb31b66593b2=d4b4e11d-8e26-50e5-a0d9-bb2d5decfeb9] org.apache.hudi.exception.HoodieMetadataException: Metadata table's deltacommits exceeded 3: this is likely caused by a pending instant in the data table. Resolve the pending instant or adjust `hoodie.metadata.max.deltacommits.when_pending`, then restart the pipeline. at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.checkNumDeltaCommits([HoodieBackedTableMetadataWriter.java:835|http://hoodiebackedtablemetadatawriter.java:835/]) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.validateTimelineBeforeSchedulingCompaction([HoodieBackedTableMetadataWriter.java:1367|http://hoodiebackedtablemetadatawriter.java:1367/]) java.lang.IllegalArgumentException: Following instants have timestamps >= compactionInstant (002) Instants :[[004__deltacommit__COMPLETED__20240515123806398]] at org.apache.hudi.common.util.ValidationUtils.checkArgument([ValidationUtils.java:42|http://validationutils.java:42/]) at org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute([ScheduleCompactionActionExecutor.java:108|http://schedulecompactionactionexecutor.java:108/]) | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7756) Audit all base file readers and replace w/ file slice readers
sivabalan narayanan created HUDI-7756: - Summary: Audit all base file readers and replace w/ file slice readers Key: HUDI-7756 URL: https://issues.apache.org/jira/browse/HUDI-7756 Project: Apache Hudi Issue Type: Improvement Components: reader-core Reporter: sivabalan narayanan If file slice reader is as performant as a base file reader when there are no log files, we should replace all base file readers w/ file slice readers. Just so we unify both COW and MOR code paths -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7654] Optimizing BQ sync for MDT (#11061)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new c75ae7f9ba2 [HUDI-7654] Optimizing BQ sync for MDT (#11061) c75ae7f9ba2 is described below commit c75ae7f9ba21bbf9ad9101415d632ed91af27712 Author: Sivabalan Narayanan AuthorDate: Fri May 10 09:57:02 2024 -0700 [HUDI-7654] Optimizing BQ sync for MDT (#11061) * Optimizing BQ sync for MDT * Adding tests --- .../hudi/sync/common/util/ManifestFileWriter.java | 51 ++--- .../utilities/TestManifestFileWriterSpark.java | 117 + 2 files changed, 151 insertions(+), 17 deletions(-) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index 2a15997ab21..3eaf80dddfe 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -19,13 +19,17 @@ package org.apache.hudi.sync.common.util; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.HoodieMetadataFileSystemView; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.fs.Path; @@ -81,25 +85,14 @@ public class ManifestFileWriter { } } + @VisibleForTesting public static Stream fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean useAbsolutePath) { try { - HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); - HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), - HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).build()); - Stream allLatestBaseFiles; - if (useFileListingFromMetadata) { -LOG.info("Fetching all base files from MDT."); -fsView.loadAllPartitions(); -allLatestBaseFiles = fsView.getLatestBaseFiles(); - } else { -List partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), -metaClient.getBasePathV2().toString(), false); -LOG.info("Retrieve all partitions from fs: {}", partitions.size()); -allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles); - } - return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName); + StorageConfiguration storageConf = metaClient.getStorageConf(); + HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(storageConf); + boolean canUseMetadataTable = useFileListingFromMetadata && metaClient.getTableConfig().isMetadataTableAvailable(); + return getLatestBaseFiles(canUseMetadataTable, engContext, metaClient, useAbsolutePath); } catch (Exception e) { throw new HoodieException("Error in fetching latest base files.", e); } @@ -109,6 +102,30 @@ public class ManifestFileWriter { return new StoragePath(metaClient.getMetaPath(), useAbsolutePath ? ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME); } + @VisibleForTesting + static Stream getLatestBaseFiles(boolean canUseMetadataTable, HoodieEngineContext engContext, HoodieTableMetaClient metaClient, + boolean useAbsolutePath) { +List partitions = FSUtils.getAllPartitionPaths(engContext, metaClient.getBasePath(), canUseMetadataTable); +LOG.info("Retrieve all partitions: " + partitions.size()); +HoodieTableFileSystemView fsView = null; +try { + fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext, metaClient, + HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(), + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); +
[jira] [Created] (HUDI-7716) Add more logs around index lookup
sivabalan narayanan created HUDI-7716: - Summary: Add more logs around index lookup Key: HUDI-7716 URL: https://issues.apache.org/jira/browse/HUDI-7716 Project: Apache Hudi Issue Type: Improvement Components: index Reporter: sivabalan narayanan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7712) Account for file slices instead of just base files while initializing RLI for MOR table
sivabalan narayanan created HUDI-7712: - Summary: Account for file slices instead of just base files while initializing RLI for MOR table Key: HUDI-7712 URL: https://issues.apache.org/jira/browse/HUDI-7712 Project: Apache Hudi Issue Type: Bug Components: metadata Reporter: sivabalan narayanan we could have deletes in log files. and hence we need to account for entire file slice instead of just base files while initializing RLI for MOR table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7673) Enhance RLI validation w/ MDT validator for false positives
[ https://issues.apache.org/jira/browse/HUDI-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7673: - Assignee: sivabalan narayanan > Enhance RLI validation w/ MDT validator for false positives > --- > > Key: HUDI-7673 > URL: https://issues.apache.org/jira/browse/HUDI-7673 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > There is a chance that we could see false positive failures w/ MDT validation > when RLI is validated. > > When FS based record key locations are polled, we could have a pending > commit. and when MDT is polled or record locations, the commit could have > been completed. And so, rli validation could return additional record > locations. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7687) Instant should not be archived until replaced file groups or older file versions are deleted
[ https://issues.apache.org/jira/browse/HUDI-7687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7687: - Assignee: sivabalan narayanan > Instant should not be archived until replaced file groups or older file > versions are deleted > > > Key: HUDI-7687 > URL: https://issues.apache.org/jira/browse/HUDI-7687 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan > Assignee: sivabalan narayanan >Priority: Minor > Labels: archive, clean > > When archival runs it may consider an instant as a candidate for archival > even if the file groups said instant replaced/updated still need to undergo a > `clean`. For example, consider the following scenario with clean and archived > scheduled/executed independently in different jobs > # Insert at C1 creates file group f1 in partition > # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 > # Any reader of partition that calls HUDI API (with or without using MDT) > will recognize that f1 should be ignored, as it has been replaced. This is > since RC2 instant file is in active timeline > # Some more instants are added to timeline. RC2 is now eligible to be > cleaned (as per the table writers' clean policy). Assume though that file > groups replaces by RC2 haven't been deleted yet, such as due to clean > repeatedly failing, async clean not being scheduled yet, or the clean failing > to delete said file groups. > # An archive job eventually is triggered, and archives C1 and RC2. Note that > f1 is still in partition > Now the table has the same consistency issue as seen in > https://issues.apache.org/jira/browse/HUDI-7655 , where replaced file groups > are still in partition and readers may see inconsistent data. > > This situation can be avoided by ensuring that archival will "block" and no > go past an older instant time if it sees that said instant didn't undergo a > clean yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete
[ https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7655: - Assignee: sivabalan narayanan > Support configuration for clean to fail execution if there is at least one > file is marked as a failed delete > > > Key: HUDI-7655 > URL: https://issues.apache.org/jira/browse/HUDI-7655 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan > Assignee: sivabalan narayanan >Priority: Minor > Labels: clean > > When a HUDI clean plan is executed, any targeted file that was not confirmed > as deleted (or non-existing) will be marked as a "failed delete". Although > these failed deletes will be added to `.clean` metadata, if incremental clean > is used then these files might not ever be picked up again as a future clean > plan, unless a "full-scan" clean ends up being scheduled. In addition to > leading to more files unnecessarily taking up storage space for longer, then > can lead to the following dataset consistency issue for COW datasets: > # Insert at C1 creates file group f1 in partition > # Replacecommit at RC2 creates file group f2 in partition, and replaces f1 > # Any reader of partition that calls HUDI API (with or without using MDT) > will recognize that f1 should be ignored, as it has been replaced. This is > since RC2 instant file is in active timeline > # Some completed instants later an incremental clean is scheduled. It moves > the "earliest commit to retain" to an time after instant time RC2, so it > targets f1 for deletion. But during execution of the plan, it fails to delete > f1. > # An archive job eventually is triggered, and archives C1 and RC2. Note that > f1 is still in partition > At this point, any job/query that reads the aforementioned partition directly > from the DFS file system calls (without directly using MDT FILES partition) > will consider both f1 and f2 as valid file groups, since RC2 is no longer in > active timeline. This is a data consistency issue, and will only be resolved > if a "full-scan" clean is triggered and deletes f1. > This specific scenario can be avoided if the user can configure HUDI clean to > fail execution of a clean plan unless all files are confirmed as deleted (or > not existing in DFS already), "blocking" the clean. The next clean attempt > will re-execute this existing plan, since clean plans cannot be "rolled > back". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7673) Enhance RLI validation w/ MDT validator for false positives
sivabalan narayanan created HUDI-7673: - Summary: Enhance RLI validation w/ MDT validator for false positives Key: HUDI-7673 URL: https://issues.apache.org/jira/browse/HUDI-7673 Project: Apache Hudi Issue Type: Improvement Components: metadata Reporter: sivabalan narayanan There is a chance that we could see false positive failures w/ MDT validation when RLI is validated. When FS based record key locations are polled, we could have a pending commit. and when MDT is polled or record locations, the commit could have been completed. And so, rli validation could return additional record locations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7659) Update 0.14.0 release docs to call out that row writer w/ clustering is enabled by default
sivabalan narayanan created HUDI-7659: - Summary: Update 0.14.0 release docs to call out that row writer w/ clustering is enabled by default Key: HUDI-7659 URL: https://issues.apache.org/jira/browse/HUDI-7659 Project: Apache Hudi Issue Type: Improvement Components: docs Reporter: sivabalan narayanan Update 0.14.0 release docs to call out that row writer w/ clustering is enabled by default -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7655] Minor fix to rli validation with MDT validator (#11060)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 0dbeb77b92a [HUDI-7655] Minor fix to rli validation with MDT validator (#11060) 0dbeb77b92a is described below commit 0dbeb77b92a786ed4c4e728533e61927b93de70b Author: Sivabalan Narayanan AuthorDate: Sun Apr 21 11:19:45 2024 -0700 [HUDI-7655] Minor fix to rli validation with MDT validator (#11060) --- .../org/apache/hudi/utilities/HoodieMetadataTableValidator.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 75ff9a41fc0..0e9820a5c9b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -959,6 +959,7 @@ public class HoodieMetadataTableValidator implements Serializable { int numErrorSamples = cfg.numRecordIndexErrorSamples; Pair> result = keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, cfg.recordIndexParallelism) .map(e -> { + String recordKey = e._1; Optional> locationOnFs = e._2._1; Optional> locationFromRecordIndex = e._2._2; List errorSampleList = new ArrayList<>(); @@ -967,13 +968,13 @@ public class HoodieMetadataTableValidator implements Serializable { && locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight())) { return Pair.of(0L, errorSampleList); } -errorSampleList.add(constructLocationInfoString(locationOnFs, locationFromRecordIndex)); +errorSampleList.add(constructLocationInfoString(recordKey, locationOnFs, locationFromRecordIndex)); return Pair.of(1L, errorSampleList); } if (!locationOnFs.isPresent() && !locationFromRecordIndex.isPresent()) { return Pair.of(0L, errorSampleList); } - errorSampleList.add(constructLocationInfoString(locationOnFs, locationFromRecordIndex)); + errorSampleList.add(constructLocationInfoString(recordKey, locationOnFs, locationFromRecordIndex)); return Pair.of(1L, errorSampleList); }) .reduce((pair1, pair2) -> { @@ -1030,9 +1031,10 @@ public class HoodieMetadataTableValidator implements Serializable { } } - private String constructLocationInfoString(Optional> locationOnFs, + private String constructLocationInfoString(String recordKey, Optional> locationOnFs, Optional> locationFromRecordIndex) { StringBuilder sb = new StringBuilder(); +sb.append("Record key " + recordKey + " -> "); sb.append("FS: "); if (locationOnFs.isPresent()) { sb.append(locationOnFs.get());
[jira] [Created] (HUDI-7645) Optimize BQ sync tool for MDT
sivabalan narayanan created HUDI-7645: - Summary: Optimize BQ sync tool for MDT Key: HUDI-7645 URL: https://issues.apache.org/jira/browse/HUDI-7645 Project: Apache Hudi Issue Type: Improvement Components: meta-sync Reporter: sivabalan narayanan Looks like in BQ sync, we are polling fsview for latest files sequentially for every partition. When MDT is enabled, we could load all partitions in one call. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7644) Add record key info with RLI validation in MDT Validator
[ https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7644: -- Fix Version/s: 1.0.0 > Add record key info with RLI validation in MDT Validator > > > Key: HUDI-7644 > URL: https://issues.apache.org/jira/browse/HUDI-7644 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata, tests-ci >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Fix For: 0.15.0, 1.0.0 > > > Add record key info with RLI validation in MDT Validator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7644) Add record key info with RLI validation in MDT Validator
[ https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7644: -- Fix Version/s: 0.15.0 > Add record key info with RLI validation in MDT Validator > > > Key: HUDI-7644 > URL: https://issues.apache.org/jira/browse/HUDI-7644 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata, tests-ci >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Fix For: 0.15.0 > > > Add record key info with RLI validation in MDT Validator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7644) Add record key info with RLI validation in MDT Validator
[ https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7644: - Assignee: sivabalan narayanan > Add record key info with RLI validation in MDT Validator > > > Key: HUDI-7644 > URL: https://issues.apache.org/jira/browse/HUDI-7644 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata, tests-ci >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > Add record key info with RLI validation in MDT Validator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7644) Add record key info with RLI validation in MDT Validator
sivabalan narayanan created HUDI-7644: - Summary: Add record key info with RLI validation in MDT Validator Key: HUDI-7644 URL: https://issues.apache.org/jira/browse/HUDI-7644 Project: Apache Hudi Issue Type: Improvement Components: metadata, tests-ci Reporter: sivabalan narayanan Add record key info with RLI validation in MDT Validator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7641) Add metrics to track what partitions are enabled in MDT
[ https://issues.apache.org/jira/browse/HUDI-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7641: -- Fix Version/s: 0.15.0 > Add metrics to track what partitions are enabled in MDT > --- > > Key: HUDI-7641 > URL: https://issues.apache.org/jira/browse/HUDI-7641 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > Fix For: 0.15.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7641) Add metrics to track what partitions are enabled in MDT
[ https://issues.apache.org/jira/browse/HUDI-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7641: - Assignee: sivabalan narayanan > Add metrics to track what partitions are enabled in MDT > --- > > Key: HUDI-7641 > URL: https://issues.apache.org/jira/browse/HUDI-7641 > Project: Apache Hudi > Issue Type: Improvement > Components: metadata >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new ca77fda51fe [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018) ca77fda51fe is described below commit ca77fda51fe3036f86d4ddb8b0e58a2f160882dc Author: Sampan S Nayak AuthorDate: Fri Apr 19 11:55:43 2024 +0530 [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018) --- .../hudi/utilities/streamer/HoodieStreamer.java| 7 +++ .../apache/hudi/utilities/streamer/StreamSync.java | 13 - .../streamer/TestStreamSyncUnitTests.java | 61 ++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 59c1bf3d164..0dd488bffcb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -428,6 +428,13 @@ public class HoodieStreamer implements Serializable { @Parameter(names = {"--config-hot-update-strategy-class"}, description = "Configuration hot update in continuous mode") public String configHotUpdateStrategyClass = ""; +@Parameter(names = {"--ignore-checkpoint"}, description = "Set this config with a unique value, recommend using a timestamp value or UUID." ++ " Setting this config indicates that the subsequent sync should ignore the last committed checkpoint for the source. The config value is stored" ++ " in the commit history, so setting the config with same values would not have any affect. This config can be used in scenarios like kafka topic change," ++ " where we would want to start ingesting from the latest or earliest offset after switching the topic (in this case we would want to ignore the previously" ++ " committed checkpoint, and rely on other configs to pick the starting offsets).") +public String ignoreCheckpoint = null; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index c9521058b12..2f5bd1fd3ff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -164,6 +164,7 @@ public class StreamSync implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class); private static final String NULL_PLACEHOLDER = "[null]"; + public static final String CHECKPOINT_IGNORE_KEY = "deltastreamer.checkpoint.ignore_key"; /** * Delta Sync Config. @@ -733,7 +734,8 @@ public class StreamSync implements Serializable, Closeable { * @return the checkpoint to resume from if applicable. * @throws IOException */ - private Option getCheckpointToResume(Option commitsTimelineOpt) throws IOException { + @VisibleForTesting + Option getCheckpointToResume(Option commitsTimelineOpt) throws IOException { Option resumeCheckpointStr = Option.empty(); // try get checkpoint from commits(including commit and deltacommit) // in COW migrating to MOR case, the first batch of the deltastreamer will lost the checkpoint from COW table, cause the dataloss @@ -750,7 +752,11 @@ public class StreamSync implements Serializable, Closeable { if (commitMetadataOption.isPresent()) { HoodieCommitMetadata commitMetadata = commitMetadataOption.get(); LOG.debug("Checkpoint reset from metadata: " + commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)); -if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) +if (cfg.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY)) +|| !cfg.ignoreCheckpoint.equals(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY { + // we ignore any existing checkpoint and start ingesting afresh + resumeCheckpointStr = Option.empty(); +} else if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) || !cfg.checkpoint.equals(c
[jira] [Created] (HUDI-7641) Add metrics to track what partitions are enabled in MDT
sivabalan narayanan created HUDI-7641: - Summary: Add metrics to track what partitions are enabled in MDT Key: HUDI-7641 URL: https://issues.apache.org/jira/browse/HUDI-7641 Project: Apache Hudi Issue Type: Improvement Components: metadata Reporter: sivabalan narayanan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7624) Fix index lookup duration to track tag location duration
sivabalan narayanan created HUDI-7624: - Summary: Fix index lookup duration to track tag location duration Key: HUDI-7624 URL: https://issues.apache.org/jira/browse/HUDI-7624 Project: Apache Hudi Issue Type: Bug Components: index Reporter: sivabalan narayanan With spark lazy evaluation, we can't start a timer before tagLocation call and end the timer later. This may not give us the right value for tag location duration. So, we need to fix the duration properly -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7606] Unpersist RDDs after table services, mainly compaction and clustering (#11000)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 48952ae5dfd [HUDI-7606] Unpersist RDDs after table services, mainly compaction and clustering (#11000) 48952ae5dfd is described below commit 48952ae5dfd82f84cc338886e653cf1412e8cda8 Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com> AuthorDate: Sun Apr 14 14:38:55 2024 -0700 [HUDI-7606] Unpersist RDDs after table services, mainly compaction and clustering (#11000) - Co-authored-by: rmahindra123 --- .../hudi/client/BaseHoodieTableServiceClient.java | 12 .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/client/SparkRDDTableServiceClient.java| 6 ++ .../apache/hudi/client/SparkRDDWriteClient.java| 21 +-- .../hudi/client/utils/SparkReleaseResources.java | 64 ++ 5 files changed, 85 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 22f1a9995bd..228eaf4d554 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -333,6 +333,7 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(compactionInstant)); + releaseResources(compactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -393,6 +394,7 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(logCompactionInstant)); + releaseResources(logCompactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -534,6 +536,7 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { this.txnManager.endTransaction(Option.of(clusteringInstant)); + releaseResources(clusteringCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -779,6 +782,7 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs); } +releaseResources(cleanInstantTime); return metadata; } @@ -1171,4 +1175,12 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl } } } + + /** + * Called after each commit of a compaction or clustering table service, + * to release any resources used. + */ + protected void releaseResources(String instantTime) { +// do nothing here + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 9ade694d340..2b9bc83091b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -236,11 +236,11 @@ public abstract class BaseHoodieWriteClient extends BaseHoodieClient commit(table, commitActionType, instantTime, metadata, stats, writeStatuses); postCommit(table, metadata, instantTime, extraMetadata); LOG.info("Committed " + instantTime); - releaseResources(instantTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } finally { this.txnManager.endTransaction(Option.of(inflightInstant)); + releaseResources(instantTime); } // trigger clean and archival. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/Spa
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7507: -- Fix Version/s: 1.0.0 > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service >Reporter: Krishen Bhan >Priority: Major > Fix For: 0.15.0, 1.0.0 > > Attachments: Flowchart (1).png, Flowchart.png > > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart (1).png! > > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > -An alternate approach (suggested by- [~pwason] -) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle.- > > An alternate approach is to have every operation abort creating a .requested > file unless it has the latest timestamp. Specifically, for any instant type, > whenever an operation is about to create a .requested plan on timeline, it > should take the table lock and assert that there are no other instants on > timeline (inflight or otherwise) that are greater than it. If that assertion > fails, then throw a retry-able conflict resolution exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7507: -- Fix Version/s: 0.15.0 > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service >Reporter: Krishen Bhan >Priority: Major > Fix For: 0.15.0 > > Attachments: Flowchart (1).png, Flowchart.png > > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart (1).png! > > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > -An alternate approach (suggested by- [~pwason] -) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle.- > > An alternate approach is to have every operation abort creating a .requested > file unless it has the latest timestamp. Specifically, for any instant type, > whenever an operation is about to create a .requested plan on timeline, it > should take the table lock and assert that there are no other instants on > timeline (inflight or otherwise) that are greater than it. If that assertion > fails, then throw a retry-able conflict resolution exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (HUDI-4699) Primary key-less data model
[ https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan resolved HUDI-4699. --- > Primary key-less data model > --- > > Key: HUDI-4699 > URL: https://issues.apache.org/jira/browse/HUDI-4699 > Project: Apache Hudi > Issue Type: Epic > Components: writer-core >Reporter: Sagar Sumit >Priority: Major > Labels: pull-request-available > > Hudi requires users to specify a primary key field. Can we do away with this > requirement? This epic tracks the work to support use cases which does not > require primary key based data modelling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (HUDI-4699) Primary key-less data model
[ https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan closed HUDI-4699. - Fix Version/s: 0.14.0 Resolution: Fixed > Primary key-less data model > --- > > Key: HUDI-4699 > URL: https://issues.apache.org/jira/browse/HUDI-4699 > Project: Apache Hudi > Issue Type: Epic > Components: writer-core >Reporter: Sagar Sumit > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > Fix For: 0.14.0 > > > Hudi requires users to specify a primary key field. Can we do away with this > requirement? This epic tracks the work to support use cases which does not > require primary key based data modelling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (HUDI-4699) Primary key-less data model
[ https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reopened HUDI-4699: --- Assignee: sivabalan narayanan > Primary key-less data model > --- > > Key: HUDI-4699 > URL: https://issues.apache.org/jira/browse/HUDI-4699 > Project: Apache Hudi > Issue Type: Epic > Components: writer-core >Reporter: Sagar Sumit > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > Hudi requires users to specify a primary key field. Can we do away with this > requirement? This epic tracks the work to support use cases which does not > require primary key based data modelling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7557] Fix incremental cleaner when commit for savepoint removed (#10946)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 9efced37f81 [HUDI-7557] Fix incremental cleaner when commit for savepoint removed (#10946) 9efced37f81 is described below commit 9efced37f819ae59b51099ee43dc75e1a876a855 Author: Sagar Sumit AuthorDate: Mon Apr 1 23:00:19 2024 +0530 [HUDI-7557] Fix incremental cleaner when commit for savepoint removed (#10946) --- .../hudi/table/action/clean/CleanPlanner.java | 1 + .../apache/hudi/table/action/TestCleanPlanner.java | 89 -- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 48ec8f9baa1..753f8c8253d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -245,6 +245,7 @@ public class CleanPlanner implements Serializable { Option instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant(); if (!instantOption.isPresent()) { LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already"); +return Stream.empty(); } HoodieInstant instant = instantOption.get(); return getPartitionsForInstants(instant); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java index 8052572fcea..9989273b723 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java @@ -138,14 +138,14 @@ public class TestCleanPlanner { void testPartitionsForIncrCleaning(HoodieWriteConfig config, String earliestInstant, String lastCompletedTimeInLastClean, String lastCleanInstant, String earliestInstantsInLastClean, List partitionsInLastClean, Map> savepointsTrackedInLastClean, Map> activeInstantsPartitions, - Map> savepoints, List expectedPartitions) throws IOException { + Map> savepoints, List expectedPartitions, boolean areCommitsForSavepointsRemoved) throws IOException { HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class); when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline); // setup savepoint mocks Set savepointTimestamps = savepoints.keySet().stream().collect(Collectors.toSet()); when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps); if (!savepoints.isEmpty()) { - for (Map.Entry> entry: savepoints.entrySet()) { + for (Map.Entry> entry : savepoints.entrySet()) { Pair> savepointMetadataOptionPair = getSavepointMetadata(entry.getValue()); HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, entry.getKey()); when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight()); @@ -156,7 +156,7 @@ public class TestCleanPlanner { Pair> cleanMetadataOptionPair = getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, earliestInstantsInLastClean, lastCompletedTimeInLastClean, savepointsTrackedInLastClean.keySet()); mockLastCleanCommit(mockHoodieTable, lastCleanInstant, earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair); -mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean); +mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved); // Trigger clean and validate partitions to clean. CleanPlanner cleanPlanner = new CleanPlanner<>(context, mockHoodieTable, config); @@ -332,7 +332,7 @@ public class TestCleanPlanner { static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { String earliestInstant = "20231204194919610"; -String earliestInstantPlusTwoDays = "20231206194919610"; +String earliestInstantPlusTwoDays = "20231206194919610"; String lastCleanInstant = earliestInstantPlusTwoDays; String earliestInstantMinusThreeDays = "20231201194919610"; String e
[jira] [Created] (HUDI-7556) Fix MDT validator to account for additional partitions in MDT
sivabalan narayanan created HUDI-7556: - Summary: Fix MDT validator to account for additional partitions in MDT Key: HUDI-7556 URL: https://issues.apache.org/jira/browse/HUDI-7556 Project: Apache Hudi Issue Type: Bug Components: metadata Reporter: sivabalan narayanan There is a chance that MDT could list additional partitions when compared to FS based listing. reason is: We load active timeline from metaclient and poll FS based listing for completed commits. And then we poll MDT for list of all partitions. in between these two, there could be a commit that could have been completed and hence MDT could be serving that as well. So, lets account for that in our validation tool -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7500] fix gaps with deduce schema and null schema (#10858)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 136d0755ad7 [HUDI-7500] fix gaps with deduce schema and null schema (#10858) 136d0755ad7 is described below commit 136d0755ad7f86d2f4b7b4813a59096c233055e7 Author: Jon Vexler AuthorDate: Wed Mar 27 17:27:27 2024 -0400 [HUDI-7500] fix gaps with deduce schema and null schema (#10858) - Co-authored-by: Jonathan Vexler <=> --- .../main/scala/org/apache/hudi/DefaultSource.scala | 7 +- .../utilities/streamer/SourceFormatAdapter.java| 2 +- .../apache/hudi/utilities/streamer/StreamSync.java | 51 -- .../deltastreamer/TestHoodieDeltaStreamer.java | 4 +- .../streamer/TestStreamSyncUnitTests.java | 192 + 5 files changed, 241 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index c346f7665df..be3d2f4ed4b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -74,7 +74,12 @@ class DefaultSource extends RelationProvider override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { try { - createRelation(sqlContext, parameters, null) + val relation = createRelation(sqlContext, parameters, null) + if (relation.schema.isEmpty) { +new EmptyRelation(sqlContext, new StructType()) + } else { +relation + } } catch { case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType()) case e => throw e diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index f29404701db..1796c96dab8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -62,7 +62,7 @@ import static org.apache.hudi.utilities.streamer.BaseErrorTableWriter.ERROR_TABL /** * Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer). */ -public final class SourceFormatAdapter implements Closeable { +public class SourceFormatAdapter implements Closeable { private final Source source; private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 1453e9fd07c..ded5348ed8f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -55,6 +55,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -255,6 +256,31 @@ public class StreamSync implements Serializable, Closeable { private final boolean useRowWriter; + @VisibleForTesting + StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, + TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, + Function onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider, + Option errorTableWriter, SourceFormatAdapter formatAdapter, Option transformer, + boolean useRowWriter, boolean autoGenerateRecordKeys) { +this.cfg = cfg; +this.hoodieSparkContext = hoodieSparkContext; +this.sparkSession = sparkSession; +this.fs = fs; +this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; +this.props = props; +this.userProvidedSchemaProvider = userProvidedSchemaProvider; +this.processedSchema = new SchemaSet(); +this.autoGenerateRecordKeys = autoGenerateRecordKeys; +this.keyGenClassName = getKeyGeneratorClassName(new TypedProperties(props)); +this.conf = conf; + +this.errorTableWriter = errorTableWriter; +this.formatAdapter = formatAdapter; +this.transformer = transformer; +this.
(hudi) branch master updated: [HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes (#10913)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 8a137631da8 [HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes (#10913) 8a137631da8 is described below commit 8a137631da86c0ee99cf847b317484414ca4b7a2 Author: Y Ethan Guo AuthorDate: Tue Mar 26 19:13:09 2024 -0700 [HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes (#10913) --- .../common/testutils/HoodieMetadataTestTable.java | 11 ++ .../functional/TestHoodieBackedTableMetadata.java | 126 - .../hudi/metadata/HoodieMetadataPayload.java | 53 + .../hudi/common/testutils/HoodieTestTable.java | 13 +++ .../hudi/metadata/TestHoodieMetadataPayload.java | 87 -- 5 files changed, 254 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index 3bcba72eb68..b1974ff3e4e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -134,6 +135,16 @@ public class HoodieMetadataTestTable extends HoodieTestTable { return cleanMetadata; } + @Override + public void repeatClean(String cleanCommitTime, + HoodieCleanerPlan cleanerPlan, + HoodieCleanMetadata cleanMetadata) throws IOException { +super.repeatClean(cleanCommitTime, cleanerPlan, cleanMetadata); +if (writer != null) { + writer.update(cleanMetadata, cleanCommitTime); +} + } + public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { super.addCompaction(instantTime, commitMetadata); if (writer != null) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 1a268675ac7..8ca0d4e16a9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -19,10 +19,14 @@ package org.apache.hudi.client.functional; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; @@ -32,8 +36,12 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; @@ -43,7 +51,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieMetadataLogRecordReader; import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; -import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieSparkTable; import
[jira] [Created] (HUDI-7549) Data inconsistency issue w/ spurious log block detection
sivabalan narayanan created HUDI-7549: - Summary: Data inconsistency issue w/ spurious log block detection Key: HUDI-7549 URL: https://issues.apache.org/jira/browse/HUDI-7549 Project: Apache Hudi Issue Type: Bug Components: reader-core Reporter: sivabalan narayanan We added support to deduce spurious log blocks with log block reader [https://github.com/apache/hudi/pull/9545] [https://github.com/apache/hudi/pull/9611] in 0.14.0. Aparrently there are some cases where it could lead to data loss or data consistency issues. [https://github.com/apache/hudi/pull/9611#issuecomment-2016687160] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7532) Fix schedule compact to only consider DCs after last compaction commit
sivabalan narayanan created HUDI-7532: - Summary: Fix schedule compact to only consider DCs after last compaction commit Key: HUDI-7532 URL: https://issues.apache.org/jira/browse/HUDI-7532 Project: Apache Hudi Issue Type: Bug Components: compaction Reporter: sivabalan narayanan Fix schedule compact to only consider DCs after last compaction commit. As of now, it also considers replace commit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7528) Fix RowCustomColumnsSortPartitioner to use repartition instead of coalesce
sivabalan narayanan created HUDI-7528: - Summary: Fix RowCustomColumnsSortPartitioner to use repartition instead of coalesce Key: HUDI-7528 URL: https://issues.apache.org/jira/browse/HUDI-7528 Project: Apache Hudi Issue Type: Bug Components: writer-core Reporter: sivabalan narayanan Fix RowCustomColumnsSortPartitioner to use repartition instead of coalesce -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [MINOR] Remove redundant fileId from HoodieAppendHandle (#10901)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 135db099afc [MINOR] Remove redundant fileId from HoodieAppendHandle (#10901) 135db099afc is described below commit 135db099afc99ad12c758c5428233614cf14a49a Author: wombatu-kun AuthorDate: Fri Mar 22 03:12:01 2024 +0700 [MINOR] Remove redundant fileId from HoodieAppendHandle (#10901) Co-authored-by: Vova Kolmakov --- .../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java| 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 8d0eb2305e6..93df86e170d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -89,7 +89,6 @@ public class HoodieAppendHandle extends HoodieWriteHandle recordList = new ArrayList<>(); @@ -158,7 +157,6 @@ public class HoodieAppendHandle extends HoodieWriteHandle();
[jira] [Created] (HUDI-7526) Fix constructors for all bulk insert sort partitioners to ensure we could use it as user defined partitioners
sivabalan narayanan created HUDI-7526: - Summary: Fix constructors for all bulk insert sort partitioners to ensure we could use it as user defined partitioners Key: HUDI-7526 URL: https://issues.apache.org/jira/browse/HUDI-7526 Project: Apache Hudi Issue Type: Bug Components: writer-core Reporter: sivabalan narayanan Our constructor for user defined sort partitioner takes in write config, while some of the partitioners used in out of the box sort mode, does not account for it. Lets fix the sort partitioners to ensure anything can be used as user defined partitioners. For eg, NoneSortMode does not have a constructor that takes in write config -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828123#comment-17828123 ] sivabalan narayanan edited comment on HUDI-7507 at 3/19/24 1:12 AM: Just trying to replay the same scenario for data table, conflict resolution could have aborted job2. and hence we may not hit the same issue. was (Author: shivnarayan): Just trying to replay the same scenario for data table, conflict resolution could have aborted job2. and hence we may not hit the same issue. > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Major > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > An alternate approach (suggested by [~pwason] ) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828142#comment-17828142 ] sivabalan narayanan commented on HUDI-7507: --- We have already fixed it w/ latest master (1.0) by generating the new commit times using locks. That should solve the issue. We can apply the same to 0.X branch. > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Major > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > An alternate approach (suggested by [~pwason] ) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services
[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828123#comment-17828123 ] sivabalan narayanan commented on HUDI-7507: --- Just trying to replay the same scenario for data table, conflict resolution could have aborted job2. and hence we may not hit the same issue. > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Krishen Bhan >Priority: Major > > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately this has the following drawbacks > * Every operation must now hold the table lock when computing its plan, even > if its an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this > An alternate approach (suggested by [~pwason] ) was to instead have all > operations including table services perform conflict resolution checks before > committing. For example, clean and compaction would generate their plan as > usual. But when creating a transaction to write a .requested file, right > before creating the file they should check if another lower timestamp instant > has appeared in the timeline. And if so, they should fail/abort without > creating the plan. Commit operations would also be updated/verified to have > similar check, before creating a .requested file (during a transaction) the > commit operation will check if a table service plan (clean/compact) with a > greater instant time has been created. And if so, would abort/fail. This > avoids the drawbacks of the first approach, but will lead to more transient > failures that users have to handle. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7511) Offset range calculation in kafka should return all topic partitions
sivabalan narayanan created HUDI-7511: - Summary: Offset range calculation in kafka should return all topic partitions Key: HUDI-7511 URL: https://issues.apache.org/jira/browse/HUDI-7511 Project: Apache Hudi Issue Type: Bug Components: deltastreamer Reporter: sivabalan narayanan after [https://github.com/apache/hudi/pull/10869] got landed, we are not returning every topic partition in final ranges. But for checkpointing purpose, we need to have every kafka topic partition in final ranges even if we are not consuming anything. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated (ca2140e2003 -> 3c8488b831c)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from ca2140e2003 [MINOR] rename KeyGenUtils#enableAutoGenerateRecordKeys (#10871) add 3c8488b831c [HUDI-7506] Compute offsetRanges based on eventsPerPartition allocated in each range (#10869) No new revisions were added by this update. Summary of changes: .../utilities/sources/helpers/KafkaOffsetGen.java | 88 ++- .../sources/helpers/TestCheckpointUtils.java | 167 - .../sources/helpers/TestKafkaOffsetGen.java| 10 +- 3 files changed, 179 insertions(+), 86 deletions(-)
(hudi) branch master updated (2af83e2d9a8 -> dc349f5293f)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit (#10676) add dc349f5293f [ENG-6316] Bump cleaner retention for MDT (#537) (#10655) No new revisions were added by this update. Summary of changes: .../hudi/metadata/HoodieMetadataWriteUtils.java| 28 +++--- .../metadata/TestHoodieMetadataWriteUtils.java | 64 ++ 2 files changed, 84 insertions(+), 8 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
(hudi) branch asf-site updated: [DOCS] Updated inline and async process with more details (#10664)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/asf-site by this push: new 19734b58a62 [DOCS] Updated inline and async process with more details (#10664) 19734b58a62 is described below commit 19734b58a6230930fea2ad1bb748d5d6f6bb24c7 Author: nadine farah AuthorDate: Fri Mar 8 10:16:59 2024 -0800 [DOCS] Updated inline and async process with more details (#10664) --- website/docs/clustering.md | 18 ++ 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/website/docs/clustering.md b/website/docs/clustering.md index f61c61a4476..149b690ff3b 100644 --- a/website/docs/clustering.md +++ b/website/docs/clustering.md @@ -170,9 +170,14 @@ for inline or async clustering are shown below with code samples. ## Inline clustering -Inline clustering happens synchronously with the regular ingestion writer, which means the next round of ingestion -cannot proceed until the clustering is complete. Inline clustering can be setup easily using spark dataframe options. -See sample below +Inline clustering happens synchronously with the regular ingestion writer or as part of the data ingestion pipeline. This means the next round of ingestion cannot proceed until the clustering is complete With inline clustering, Hudi will schedule, plan clustering operations after each commit is completed and execute the clustering plans after it’s created. This is the simplest deployment model to run because it’s easier to manage than running different asynchronous Spark jobs. This mode [...] + +For this deployment mode, please enable and set: `hoodie.clustering.inline` + +To choose how often clustering is triggered, also set: `hoodie.clustering.inline.max.commits`. + +Inline clustering can be setup easily using spark dataframe options. +See sample below: ```scala import org.apache.hudi.QuickstartUtils._ @@ -202,7 +207,12 @@ df.write.format("org.apache.hudi"). ## Async Clustering -Async clustering runs the clustering table service in the background without blocking the regular ingestions writers. +Async clustering runs the clustering table service in the background without blocking the regular ingestions writers. There are three different ways to deploy an asynchronous clustering process: + +- **Asynchronous execution within the same process**: In this deployment mode, Hudi will schedule and plan the clustering operations after each commit is completed as part of the ingestion pipeline. Separately, Hudi spins up another thread within the same job and executes the clustering table service. This is supported by Spark Streaming, Flink and DeltaStreamer in continuous mode. For this deployment mode, please enable `hoodie.clustering.async.enabled` and `hoodie.clustering.async.max. [...] +- **Asynchronous scheduling and execution by a separate process**: In this deployment mode, the application will write data to a Hudi table as part of the ingestion pipeline. A separate clustering job will schedule, plan and execute the clustering operation. By running a different job for the clustering operation, it rebalances how Hudi uses compute resources: fewer compute resources are needed for the ingestion, which makes ingestion latency stable, and an independent set of compute res [...] +- **Scheduling inline and executing async**: In this deployment mode, the application ingests data and schedules the clustering in one job; in another, the application executes the clustering plan. The supported writers (see below) won’t be blocked from ingesting data. If the metadata table is enabled, a lock provider is not needed. However, if the metadata table is enabled, please ensure all jobs have the lock providers configured for concurrency control. All writers support this deploy [...] + Hudi supports [multi-writers](https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing) which provides snapshot isolation between multiple table services, thus allowing writers to continue with ingestion while clustering runs in the background.
(hudi) branch master updated: [HUDI-7411] Meta sync should consider cleaner commit (#10676)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit (#10676) 2af83e2d9a8 is described below commit 2af83e2d9a8fbb6cc33fdf29e38b72684c2da4ca Author: Sagar Sumit AuthorDate: Fri Mar 8 23:34:53 2024 +0530 [HUDI-7411] Meta sync should consider cleaner commit (#10676) --- .../hudi/common/table/timeline/TimelineUtils.java | 27 ++--- .../hudi/common/table/TestTimelineUtils.java | 46 -- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 7 +--- .../apache/hudi/sync/common/HoodieSyncClient.java | 5 +-- 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 5e710800d6f..dbe8f83fdbe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -81,13 +81,15 @@ public class TimelineUtils { } /** - * Returns partitions that have been deleted or marked for deletion in the given timeline. + * Returns partitions that have been deleted or marked for deletion in the timeline between given commit time range. * Does not include internal operations such as clean in the timeline. */ - public static List getDroppedPartitions(HoodieTimeline timeline) { + public static List getDroppedPartitions(HoodieTableMetaClient metaClient, Option lastCommitTimeSynced, Option lastCommitCompletionTimeSynced) { +HoodieTimeline timeline = lastCommitTimeSynced.isPresent() +? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced) +: metaClient.getActiveTimeline(); HoodieTimeline completedTimeline = timeline.getWriteTimeline().filterCompletedInstants(); HoodieTimeline replaceCommitTimeline = completedTimeline.getCompletedReplaceTimeline(); - Map partitionToLatestDeleteTimestamp = replaceCommitTimeline.getInstantsAsStream() .map(instant -> { try { @@ -102,6 +104,21 @@ public class TimelineUtils { .flatMap(pair -> pair.getRight().getPartitionToReplaceFileIds().keySet().stream() .map(partition -> new AbstractMap.SimpleEntry<>(partition, pair.getLeft().getTimestamp())) ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (existing, replace) -> replace)); +// cleaner could delete a partition when there are no active filegroups in the partition +HoodieTimeline cleanerTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); +cleanerTimeline.getInstantsAsStream() +.forEach(instant -> { + try { +HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(cleanerTimeline.getInstantDetails(instant).get()); +cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + if (partitionMetadata.getIsPartitionDeleted()) { +partitionToLatestDeleteTimestamp.put(partition, instant.getTimestamp()); + } +}); + } catch (IOException e) { +throw new HoodieIOException("Failed to get partitions cleaned at " + instant, e); + } +}); if (partitionToLatestDeleteTimestamp.isEmpty()) { // There is no dropped partitions @@ -244,7 +261,7 @@ public class TimelineUtils { return false; } catch (IOException e) { - throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); + throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePathV2().toString(), e); } } @@ -440,7 +457,7 @@ public class TimelineUtils { } public enum HollowCommitHandling { -FAIL, BLOCK, USE_TRANSITION_TIME; +FAIL, BLOCK, USE_TRANSITION_TIME } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index c81a05b4c20..d258753c3a8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -158,7 +158,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts); activeTimeline.createN
[jira] [Created] (HUDI-7491) Handle null extra metadata w/ clean commit metadata
sivabalan narayanan created HUDI-7491: - Summary: Handle null extra metadata w/ clean commit metadata Key: HUDI-7491 URL: https://issues.apache.org/jira/browse/HUDI-7491 Project: Apache Hudi Issue Type: Bug Components: cleaning Reporter: sivabalan narayanan [https://github.com/apache/hudi/pull/10651/] After this fix, older clean commits may not have any extra metadata. we need to handle null for the entire map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed
[ https://issues.apache.org/jira/browse/HUDI-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7490: -- Description: We added a fix recently where cleaner will take care of cleaning up savepointed files too w/o fail with [https://github.com/apache/hudi/pull/10651] Scenario above patch fixes: By default incremental cleaner is enabled. Cleaner during planning, will only account for partitions touched in recent commits (after earliest commit to retain from last completed clean). So, if there is a savepoint added and removed later on, cleaner might miss to take care of cleaning. So, we fixed the gap in above patch. Fix: Clean commit metadata will track savepointed commits. So, next time when clean planner runs, we find the mis-match b/w tracked savepointed commits and current savepoints from timeline and if there is a difference, cleaner will account for partittions touched by the savepointed commit. But we might have a gap wrt archival. If we ensure archival will run just after cleaning and not independently, we should be good. but if there is a chance we could expose duplicate data to readers w/ below scenario. lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the files created at t5 and went past it. and say we have a replace commit at t10 which replaced all data files that were created at t5. w/ this state, say we removed the savepoint. we will have data files created by t5.commit in data directory. as long as t10 is in active timeline, readers will only see files written by t10 and will ignore files written by t5. at this juncture, if we run archival (w/o cleaner), archival might archive t5 to t10. on which case both data files written by t5 and t10 will be exposed to readers. In most common deployment models, where we recommend to stop the pipeline while doing savepoint and restore or deleting savepoint, this might be uncommon. but there is a chance that this could happen. So, we have to guard the archival in this case. Essentially, we need to ensure before archiving a replace commit, the fileIds that were replaced are cleaned by the cleaner. Probable fix: We can follow similar approach we followed in [https://github.com/apache/hudi/pull/10651] . Essentially check for list of savepoints in current timeline and compare it w/ savepointed instants in latest clean commit metadata. If they match, we do not need to block archival. but if there is a difference (which means a savepoint was deleted in timeline and cleaner has not got a chance to cleanup yet), we should punt archiving anything and come back next time. was: We added a fix recently where cleaner will take care of cleaning up savepointed files too w/o fail with [https://github.com/apache/hudi/pull/10651] But we might have a gap wrt archival. If we ensure archival will run just after cleaning and not independently, we should be good. but if there is a chance we could expose duplicate data to readers w/ below scenario. lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the files created at t5 and went past it. and say we have a replace commit at t10 which replaced all data files that were created at t5. w/ this state, say we removed the savepoint. we will have data files created by t5.commit in data directory. as long as t10 is in active timeline, readers will only see files written by t10 and will ignore files written by t5. at this juncture, if we run archival (w/o cleaner), archival might archive t5 to t10. on which case both data files written by t5 and t10 will be exposed to readers. In most common deployment models, where we recommend to stop the pipeline while doing savepoint and restore or deleting savepoint, this might be uncommon. but there is a chance that this could happen. So, we have to guard the archival in this case. Essentially, we need to ensure before archiving a replace commit, the fileIds that were replaced are cleaned by the cleaner. Probable fix: We can follow similar approach we followed in [https://github.com/apache/hudi/pull/10651] . Essentially check for list of savepoints in current timeline and compare it w/ savepointed instants in latest clean commit metadata. If they match, we do not need to block archival. but if there is a difference (which means a savepoint was deleted in timeline and cleaner has not got a chance to cleanup yet), we should punt archiving anything and come back next time. > Fix archival guarding data files not yet cleaned up by cleaner when savepoint > is removed > > > Key: HUDI-7490 > URL: https://issues.apache.org/jira/browse/HUDI-7490 >
[jira] [Updated] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed
[ https://issues.apache.org/jira/browse/HUDI-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-7490: -- Description: We added a fix recently where cleaner will take care of cleaning up savepointed files too w/o fail with [https://github.com/apache/hudi/pull/10651] But we might have a gap wrt archival. If we ensure archival will run just after cleaning and not independently, we should be good. but if there is a chance we could expose duplicate data to readers w/ below scenario. lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the files created at t5 and went past it. and say we have a replace commit at t10 which replaced all data files that were created at t5. w/ this state, say we removed the savepoint. we will have data files created by t5.commit in data directory. as long as t10 is in active timeline, readers will only see files written by t10 and will ignore files written by t5. at this juncture, if we run archival (w/o cleaner), archival might archive t5 to t10. on which case both data files written by t5 and t10 will be exposed to readers. In most common deployment models, where we recommend to stop the pipeline while doing savepoint and restore or deleting savepoint, this might be uncommon. but there is a chance that this could happen. So, we have to guard the archival in this case. Essentially, we need to ensure before archiving a replace commit, the fileIds that were replaced are cleaned by the cleaner. Probable fix: We can follow similar approach we followed in [https://github.com/apache/hudi/pull/10651] . Essentially check for list of savepoints in current timeline and compare it w/ savepointed instants in latest clean commit metadata. If they match, we do not need to block archival. but if there is a difference (which means a savepoint was deleted in timeline and cleaner has not got a chance to cleanup yet), we should punt archiving anything and come back next time. was: We added a fix recently where cleaner will take care of cleaning up savepointed files too w/o fail with [https://github.com/apache/hudi/pull/10651] But we might have a gap wrt archival. If we ensure archival will run just after cleaning and not independently, we should be good. but if there is a chance we could expose duplicate data to readers w/ below scenario. lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the files created at t5 and went past it. and say we have a replace commit at t10 which replaced all data files that were created at t5. w/ this state, say we removed the savepoint. we will have data files created by t5.commit in data directory. as long as t10 is in active timeline, readers will only see files written by t10 and will ignore files written by t5. at this juncture, if we run archival (w/o cleaner), archival might archive t5 to t10. on which case both data files written by t5 and t10 will be exposed to readers. So, we have to guard the archival in this case. Essentially, we need to ensure before archiving a replace commit, the fileIds that were replaced are cleaned by the cleaner. Probable fix: We can follow similar approach we followed in [https://github.com/apache/hudi/pull/10651] . Essentially check for list of savepoints in current timeline and compare it w/ savepointed instants in latest clean commit metadata. If they match, we do not need to block archival. but if there is a difference (which means a savepoint was deleted in timeline and cleaner has not got a chance to cleanup yet), we should punt archiving anything and come back next time. > Fix archival guarding data files not yet cleaned up by cleaner when savepoint > is removed > > > Key: HUDI-7490 > URL: https://issues.apache.org/jira/browse/HUDI-7490 > Project: Apache Hudi > Issue Type: Bug > Components: archiving, cleaning, clustering > Reporter: sivabalan narayanan >Priority: Major > > We added a fix recently where cleaner will take care of cleaning up > savepointed files too w/o fail with > [https://github.com/apache/hudi/pull/10651] > > But we might have a gap wrt archival. > If we ensure archival will run just after cleaning and not independently, we > should be good. > but if there is a chance we could expose duplicate data to readers w/ below > scenario. > > lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the > files created at t5 and went past it. and say we have a replace commit at t10 > which replaced all data files that were created at t5. > w/ this state,
[jira] [Created] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed
sivabalan narayanan created HUDI-7490: - Summary: Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed Key: HUDI-7490 URL: https://issues.apache.org/jira/browse/HUDI-7490 Project: Apache Hudi Issue Type: Bug Components: archiving, cleaning, clustering Reporter: sivabalan narayanan We added a fix recently where cleaner will take care of cleaning up savepointed files too w/o fail with [https://github.com/apache/hudi/pull/10651] But we might have a gap wrt archival. If we ensure archival will run just after cleaning and not independently, we should be good. but if there is a chance we could expose duplicate data to readers w/ below scenario. lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the files created at t5 and went past it. and say we have a replace commit at t10 which replaced all data files that were created at t5. w/ this state, say we removed the savepoint. we will have data files created by t5.commit in data directory. as long as t10 is in active timeline, readers will only see files written by t10 and will ignore files written by t5. at this juncture, if we run archival (w/o cleaner), archival might archive t5 to t10. on which case both data files written by t5 and t10 will be exposed to readers. So, we have to guard the archival in this case. Essentially, we need to ensure before archiving a replace commit, the fileIds that were replaced are cleaned by the cleaner. Probable fix: We can follow similar approach we followed in [https://github.com/apache/hudi/pull/10651] . Essentially check for list of savepoints in current timeline and compare it w/ savepointed instants in latest clean commit metadata. If they match, we do not need to block archival. but if there is a difference (which means a savepoint was deleted in timeline and cleaner has not got a chance to cleanup yet), we should punt archiving anything and come back next time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 6c7ee8ef210 [HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565) 6c7ee8ef210 is described below commit 6c7ee8ef210ae7b410d6e8039a770b5cf27c4067 Author: Krishen <22875197+kb...@users.noreply.github.com> AuthorDate: Tue Mar 5 08:41:39 2024 -0800 [HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565) - Co-authored-by: Krishen Bhan <“bkris...@uber.com”> --- hudi-client/hudi-client-common/pom.xml | 10 ++ .../org/apache/hudi/config/HoodieWriteConfig.java | 28 .../hudi/config/metrics/HoodieMetricsM3Config.java | 126 ++ .../hudi/metadata/HoodieMetadataWriteUtils.java| 10 ++ .../hudi/metrics/MetricsReporterFactory.java | 4 + .../apache/hudi/metrics/MetricsReporterType.java | 2 +- .../apache/hudi/metrics/m3/M3MetricsReporter.java | 120 + .../hudi/metrics/m3/M3ScopeReporterAdaptor.java| 145 + .../org/apache/hudi/metrics/m3/TestM3Metrics.java | 92 + packaging/hudi-flink-bundle/pom.xml| 6 + packaging/hudi-integ-test-bundle/pom.xml | 6 + packaging/hudi-kafka-connect-bundle/pom.xml| 6 + packaging/hudi-spark-bundle/pom.xml| 7 + packaging/hudi-utilities-bundle/pom.xml| 6 + packaging/hudi-utilities-slim-bundle/pom.xml | 6 + pom.xml| 12 +- 16 files changed, 584 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 1778e2e98b4..9a4a7f2104b 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -120,6 +120,16 @@ io.prometheus simpleclient_pushgateway + + com.uber.m3 + tally-m3 + ${tally.version} + + + com.uber.m3 + tally-core + ${tally.version} + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9447069a995..93691e8cdae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -68,6 +68,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.config.metrics.HoodieMetricsM3Config; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -2238,6 +2239,26 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieMetricsGraphiteConfig.GRAPHITE_REPORT_PERIOD_IN_SECONDS); } + public String getM3ServerHost() { +return getString(HoodieMetricsM3Config.M3_SERVER_HOST_NAME); + } + + public int getM3ServerPort() { +return getInt(HoodieMetricsM3Config.M3_SERVER_PORT_NUM); + } + + public String getM3Tags() { +return getString(HoodieMetricsM3Config.M3_TAGS); + } + + public String getM3Env() { +return getString(HoodieMetricsM3Config.M3_ENV); + } + + public String getM3Service() { +return getString(HoodieMetricsM3Config.M3_SERVICE); + } + public String getJmxHost() { return getString(HoodieMetricsJmxConfig.JMX_HOST_NAME); } @@ -2745,6 +2766,7 @@ public class HoodieWriteConfig extends HoodieConfig { private boolean isPreCommitValidationConfigSet = false; private boolean isMetricsJmxConfigSet = false; private boolean isMetricsGraphiteConfigSet = false; +private boolean isMetricsM3ConfigSet = false; private boolean isLayoutConfigSet = false; public Builder withEngineType(EngineType engineType) { @@ -2984,6 +3006,12 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } +public Builder withMetricsM3Config(HoodieMetricsM3Config metricsM3Config) { + writeConfig.getProps().putAll(metricsM3Config.getProps()); + isMetricsM3ConfigSet = true; + return this; +} + public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) { writeConfig.getProps().putAll(validatorConfig.getProps()); isPreCommitValidationConfigSet = true; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/co
[jira] [Created] (HUDI-7478) Fix max delta commits guard check w/ MDT
sivabalan narayanan created HUDI-7478: - Summary: Fix max delta commits guard check w/ MDT Key: HUDI-7478 URL: https://issues.apache.org/jira/browse/HUDI-7478 Project: Apache Hudi Issue Type: Bug Components: metadata Reporter: sivabalan narayanan protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int maxNumDeltaCommitsWhenPending) \{ final HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline(); Option lastCompaction = activeTimeline.filterCompletedInstants() .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant(); int numDeltaCommits = lastCompaction.isPresent() ? activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants() : activeTimeline.getDeltaCommitTimeline().countInstants(); if (numDeltaCommits > maxNumDeltaCommitsWhenPending) { throw new HoodieMetadataException(String.format("Metadata table's deltacommits exceeded %d: " + "this is likely caused by a pending instant in the data table. Resolve the pending instant " + "or adjust `%s`, then restart the pipeline.", maxNumDeltaCommitsWhenPending, HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key())); } } Here we account for action type "compaction. But compaction completed instant will have "commit" as action. So, we need to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7460) Fix compaction schedule with pending delta commits
sivabalan narayanan created HUDI-7460: - Summary: Fix compaction schedule with pending delta commits Key: HUDI-7460 URL: https://issues.apache.org/jira/browse/HUDI-7460 Project: Apache Hudi Issue Type: Improvement Components: compaction Reporter: sivabalan narayanan Hudi has a constraint that compaction schedule can happen only if there are no pending delta commits whose instant time < compaction instant being scheduled. We were throwing exception when this condition is not met. wee should fix the user behavior here so that we do not throw exception and return an empty plan when this condition is met. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated (89d267a4bf4 -> 6f7fe8a05a9)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git from 89d267a4bf4 [HUDI-5823] Partition ttl management (#9723) add 6f7fe8a05a9 [MINOR] Modify filter to allow removal of column stats from metadata table for bootstrap table files (#10238) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[jira] [Assigned] (HUDI-7429) Fix avg record size estimation for delta commits and replace commits
[ https://issues.apache.org/jira/browse/HUDI-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7429: - Assignee: sivabalan narayanan > Fix avg record size estimation for delta commits and replace commits > > > Key: HUDI-7429 > URL: https://issues.apache.org/jira/browse/HUDI-7429 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > > avg record size calculation only considers COMMIT for now. lets fix it to > include delta commit and replace commits as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7429) Fix avg record size estimation for delta commits and replace commits
sivabalan narayanan created HUDI-7429: - Summary: Fix avg record size estimation for delta commits and replace commits Key: HUDI-7429 URL: https://issues.apache.org/jira/browse/HUDI-7429 Project: Apache Hudi Issue Type: Improvement Components: writer-core Reporter: sivabalan narayanan avg record size calculation only considers COMMIT for now. lets fix it to include delta commit and replace commits as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
(hudi) branch master updated: [MINOR] Clarify config descriptions (#10681)
This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 9da1f2b15e2 [MINOR] Clarify config descriptions (#10681) 9da1f2b15e2 is described below commit 9da1f2b15e2bf873a7d3db56dbc0183479c38c4c Author: Bhavani Sudha Saktheeswaran <2179254+bhasu...@users.noreply.github.com> AuthorDate: Thu Feb 15 20:39:30 2024 -0800 [MINOR] Clarify config descriptions (#10681) This aligns with the doc change here: https://github.com/apache/hudi/pull/10680 --- .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 99080629e17..47a7c61a60f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -500,7 +500,9 @@ object DataSourceWriteOptions { .defaultValue("false") .markAdvanced() .withDocumentation("If set to true, records from the incoming dataframe will not overwrite existing records with the same key during the write operation. " + - "This config is deprecated as of 0.14.0. Please use hoodie.datasource.insert.dup.policy instead."); + " **Note** Just for Insert operation in Spark SQL writing since 0.14.0, users can switch to the config `hoodie.datasource.insert.dup.policy` instead " + + "for a simplified duplicate handling experience. The new config will be incorporated into all other writing flows and this config will be fully deprecated " + + "in future releases."); val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.partitions.to.delete") @@ -597,7 +599,7 @@ object DataSourceWriteOptions { .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY) .markAdvanced() .sinceVersion("0.14.0") -.withDocumentation("When operation type is set to \"insert\", users can optionally enforce a dedup policy. This policy will be employed " +.withDocumentation("**Note** This is only applicable to Spark SQL writing.When operation type is set to \"insert\", users can optionally enforce a dedup policy. This policy will be employed " + " when records being ingested already exists in storage. Default policy is none and no action will be taken. Another option is to choose " + " \"drop\", on which matching records from incoming will be dropped and the rest will be ingested. Third option is \"fail\" which will " + "fail the write operation when same records are re-ingested. In other words, a given record as deduced by the key generation policy " +
[jira] [Assigned] (HUDI-7407) Add optional clean support to standalone compaction and clustering jobs
[ https://issues.apache.org/jira/browse/HUDI-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan reassigned HUDI-7407: - Assignee: sivabalan narayanan > Add optional clean support to standalone compaction and clustering jobs > --- > > Key: HUDI-7407 > URL: https://issues.apache.org/jira/browse/HUDI-7407 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service >Reporter: sivabalan narayanan > Assignee: sivabalan narayanan >Priority: Major > Labels: pull-request-available > > Lets add top level config to standalone compaction and clustering job to > optionally clean. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7407) Add optional clean support to standalone compaction and clustering jobs
sivabalan narayanan created HUDI-7407: - Summary: Add optional clean support to standalone compaction and clustering jobs Key: HUDI-7407 URL: https://issues.apache.org/jira/browse/HUDI-7407 Project: Apache Hudi Issue Type: Improvement Components: table-service Reporter: sivabalan narayanan Lets add top level config to standalone compaction and clustering job to optionally clean. -- This message was sent by Atlassian Jira (v8.20.10#820010)