[jira] [Updated] (HUDI-7547) Simplification of archival, savepoint, cleaning interplays

2024-06-06 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7547:
--
Description: 
 

We are tackling the replace commit, savepoint, cleaner, archival data 
consistency issues here: https://issues.apache.org/jira/browse/HUDI-7779 

  was:
 

Related to https://issues.apache.org/jira/browse/HUDI-7779 


> Simplification of archival, savepoint, cleaning interplays
> --
>
> Key: HUDI-7547
> URL: https://issues.apache.org/jira/browse/HUDI-7547
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: archiving, cleaning, savepoint, table-service
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 1.0.0
>
>
>  
> We are tackling the replace commit, savepoint, cleaner, archival data 
> consistency issues here: https://issues.apache.org/jira/browse/HUDI-7779 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7547) Simplification of archival, savepoint, cleaning interplays

2024-06-06 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7547:
--
Description: 
 

Related to https://issues.apache.org/jira/browse/HUDI-7779 

> Simplification of archival, savepoint, cleaning interplays
> --
>
> Key: HUDI-7547
> URL: https://issues.apache.org/jira/browse/HUDI-7547
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: archiving, cleaning, savepoint, table-service
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 1.0.0
>
>
>  
> Related to https://issues.apache.org/jira/browse/HUDI-7779 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-06-06 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-06-06 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

(hudi) branch master updated (4224d872719 -> b5913c65147)

2024-06-06 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from 4224d872719 [MINOR] Update DOAP with 0.15.0 Release (#11409)
 add b5913c65147 [MINOR] Add logs whenever fallback to fs listing even with 
MDT enabled (#11407)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java| 1 +
 .../main/java/org/apache/hudi/metadata/HoodieTableMetadata.java| 7 ++-
 2 files changed, 7 insertions(+), 1 deletion(-)



[jira] [Assigned] (HUDI-7824) Fix incremental partitions fetch logic when savepoint is removed for Incr cleaner

2024-06-01 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7824:
-

Assignee: sivabalan narayanan

> Fix incremental partitions fetch logic when savepoint is removed for Incr 
> cleaner
> -
>
> Key: HUDI-7824
> URL: https://issues.apache.org/jira/browse/HUDI-7824
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: cleaning
>    Reporter: sivabalan narayanan
>Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> with incremental cleaner, if a savepoint is blocking cleaning up of a commit 
> and cleaner moved ahead wrt earliest commit to retain, when savepoint is 
> removed later, cleaner should account for cleaning up the commit of interest. 
>  
> Lets ensure clean planner account for all partitions when such savepoint 
> removal is detected



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7824) Fix incremental partitions fetch logic when savepoint is removed for Incr cleaner

2024-05-31 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7824:
-

 Summary: Fix incremental partitions fetch logic when savepoint is 
removed for Incr cleaner
 Key: HUDI-7824
 URL: https://issues.apache.org/jira/browse/HUDI-7824
 Project: Apache Hudi
  Issue Type: Bug
  Components: cleaning
Reporter: sivabalan narayanan


with incremental cleaner, if a savepoint is blocking cleaning up of a commit 
and cleaner moved ahead wrt earliest commit to retain, when savepoint is 
removed later, cleaner should account for cleaning up the commit of interest. 

 

Lets ensure clean planner account for all partitions when such savepoint 
removal is detected



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-30 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Commented] (HUDI-7211) Relax need of ordering/precombine field for tables with autogenerated record keys for DeltaStreamer

2024-05-29 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850517#comment-17850517
 ] 

sivabalan narayanan commented on HUDI-7211:
---

For auto record key gen, you need to set operation type to "INSERT". Can you 
give that a try. 

> Relax need of ordering/precombine field for tables with autogenerated record 
> keys for DeltaStreamer
> ---
>
> Key: HUDI-7211
> URL: https://issues.apache.org/jira/browse/HUDI-7211
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: writer-core
>Reporter: Aditya Goenka
>Priority: Critical
> Fix For: 1.1.0
>
>
> [https://github.com/apache/hudi/issues/10233]
>  
> ```
> NOW=$(date '+%Y%m%dt%H%M%S')
> ${SPARK_HOME}/bin/spark-submit \
> --jars 
> ${path_prefix}/jars/${SPARK_V}/hudi-spark${SPARK_VERSION}-bundle_2.12-${HUDI_VERSION}.jar
>  \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> ${path_prefix}/jars/${SPARK_V}/hudi-utilities-slim-bundle_2.12-${HUDI_VERSION}.jar
>  \
> --target-base-path ${path_prefix}/testcases/stocks/data/target/${NOW} \
> --target-table stocks${NOW} \
> --table-type COPY_ON_WRITE \
> --base-file-format PARQUET \
> --props ${path_prefix}/testcases/stocks/configs/hoodie.properties \
> --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
> --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --hoodie-conf 
> hoodie.deltastreamer.schemaprovider.source.schema.file=${path_prefix}/testcases/stocks/data/schema_without_ts.avsc
>  \
> --hoodie-conf 
> hoodie.deltastreamer.schemaprovider.target.schema.file=${path_prefix}/testcases/stocks/data/schema_without_ts.avsc
>  \
> --op UPSERT \
> --spark-master yarn \
> --hoodie-conf 
> hoodie.deltastreamer.source.dfs.root=${path_prefix}/testcases/stocks/data/source_without_ts
>  \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date \
> --hoodie-conf hoodie.datasource.write.keygenerator.type=SIMPLE \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=false \
> --hoodie-conf hoodie.metadata.enable=true
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation

2024-05-29 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7812:
-

Assignee: sivabalan narayanan

> Async Clustering w/ row writer fails due to timetravel query validation 
> 
>
> Key: HUDI-7812
> URL: https://issues.apache.org/jira/browse/HUDI-7812
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: clustering
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> With clustering row writer enabled flow, we trigger a time travel query to 
> read input records. But the query side fails if there are any pending commits 
> (due to new ingestion ) whose timestamp < clustering instant time. we need to 
> relax this constraint. 
>  
> {code:java}
> Failed to execute CLUSTERING service
>     java.util.concurrent.CompletionException: 
> org.apache.hudi.exception.HoodieTimeTravelException: Time travel's timestamp 
> '20240406123837295' must be earlier than the first incomplete commit 
> timestamp '20240406123834233'.
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>  ~[?:1.8.0_392-internal]
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>  ~[?:1.8.0_392-internal]
>         at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>  ~[?:1.8.0_392-internal]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_392-internal]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_392-internal]
>         at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392-internal]
>     Caused by: org.apache.hudi.exception.HoodieTimeTravelException: Time 
> travel's timestamp '20240406123837295' must be earlier than the first 
> incomplete commit timestamp '20240406123834233'.
>         at 
> org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf(TimelineUtils.java:369)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1(HoodieBaseRelation.scala:416)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1$adapted(HoodieBaseRelation.scala:416)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at scala.Option.foreach(Option.scala:407) 
> ~[scala-library-2.12.17.jar:?]
>         at 
> org.apache.hudi.HoodieBaseRelation.listLatestFileSlices(HoodieBaseRelation.scala:416)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:225)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:68)
>  ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:369) 
> ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
>  ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
>  ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413)
>  ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
>  ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
>  ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
>         at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
>  ~[spark-catalyst_2.12-3.2.3.jar:3.2.3]
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) 
> ~[scala-library-2.12.17.jar:?]
>         at scala.collection.Iterator$$anon$11

[jira] [Updated] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation

2024-05-29 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7812:
--
Description: 
With clustering row writer enabled flow, we trigger a time travel query to read 
input records. But the query side fails if there are any pending commits (due 
to new ingestion ) whose timestamp < clustering instant time. we need to relax 
this constraint. 

 
{code:java}
Failed to execute CLUSTERING service
    java.util.concurrent.CompletionException: 
org.apache.hudi.exception.HoodieTimeTravelException: Time travel's timestamp 
'20240406123837295' must be earlier than the first incomplete commit timestamp 
'20240406123834233'.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_392-internal]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 ~[?:1.8.0_392-internal]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 ~[?:1.8.0_392-internal]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_392-internal]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_392-internal]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_392-internal]
    Caused by: org.apache.hudi.exception.HoodieTimeTravelException: Time 
travel's timestamp '20240406123837295' must be earlier than the first 
incomplete commit timestamp '20240406123834233'.
        at 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf(TimelineUtils.java:369)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1(HoodieBaseRelation.scala:416)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.hudi.HoodieBaseRelation.$anonfun$listLatestFileSlices$1$adapted(HoodieBaseRelation.scala:416)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.17.jar:?]
        at 
org.apache.hudi.HoodieBaseRelation.listLatestFileSlices(HoodieBaseRelation.scala:416)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:225)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:68)
 ~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:369) 
~[hudi-utilities-bundle_2.12-1.8.1-INTERNAL.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:323)
 ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:357)
 ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:413)
 ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:356)
 ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:323)
 ~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
 ~[spark-catalyst_2.12-3.2.3.jar:3.2.3]
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) 
~[scala-library-2.12.17.jar:?]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) 
~[scala-library-2.12.17.jar:?]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) 
~[scala-library-2.12.17.jar:?]
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
~[spark-catalyst_2.12-3.2.3.jar:3.2.3]
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67) 
~[spark-sql_2.12-3.2.3.jar:1.8.1-INTERNAL]
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
 ~[spark-catalyst_2.12-3.2.3.jar:3.2.3]
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
~[scala-library-2.12.17.jar:?]
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
~[scala-library-2.12.17.jar:?]
        at scala.collection.Iterator.foreach(Iterator.scala:943) 
~[scala-library-2.12.17.

[jira] [Updated] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation

2024-05-29 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7812:
--
Description: 
With clustering row writer enabled flow, we trigger a time travel query to read 
input records. But the query side fails if there are any pending commits (due 
to new ingestion ) whose timestamp < clustering instant time. we need to relax 
this constraint. 

 

 

 

  was:
With clustering row writer enabled flow, we trigger a time travel query to read 
input records. But the query side fails if there are any pending commits (due 
to new ingestion ) whose timestamp < clustering instant time. we need to relax 
this constraint. 

 


> Async Clustering w/ row writer fails due to timetravel query validation 
> 
>
> Key: HUDI-7812
> URL: https://issues.apache.org/jira/browse/HUDI-7812
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: clustering
>Reporter: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> With clustering row writer enabled flow, we trigger a time travel query to 
> read input records. But the query side fails if there are any pending commits 
> (due to new ingestion ) whose timestamp < clustering instant time. we need to 
> relax this constraint. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7812) Async Clustering w/ row writer fails due to timetravel query validation

2024-05-29 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7812:
-

 Summary: Async Clustering w/ row writer fails due to timetravel 
query validation 
 Key: HUDI-7812
 URL: https://issues.apache.org/jira/browse/HUDI-7812
 Project: Apache Hudi
  Issue Type: Bug
  Components: clustering
Reporter: sivabalan narayanan


With clustering row writer enabled flow, we trigger a time travel query to read 
input records. But the query side fails if there are any pending commits (due 
to new ingestion ) whose timestamp < clustering instant time. we need to relax 
this constraint. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch branch-0.x updated: [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde (#11356)

2024-05-29 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
 new bd26797c8e6 [HUDI-7809] Use Spark SerializableConfiguration to avoid 
NPE in Kryo serde (#11356)
bd26797c8e6 is described below

commit bd26797c8e651694b31bdf614f55c9fc6a757f71
Author: Y Ethan Guo 
AuthorDate: Wed May 29 07:53:40 2024 -0700

[HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde 
(#11356)

* [HUDI-7809] Use Spark SerializableConfiguration to avoid NPE in Kryo serde

* Revert changes in HoodieBaseRelation
---
 .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java|  7 +++
 .../parquet/Spark30LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark31LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark32LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark33LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark34LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark35LegacyHoodieParquetFileFormat.scala   | 12 ++--
 7 files changed, 39 insertions(+), 40 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index 357200f5f0e..269a83bf7ac 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -21,9 +21,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.util.JavaScalaConverters;
@@ -51,6 +49,7 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
 
 import javax.annotation.Nonnull;
 
@@ -163,7 +162,7 @@ public class ColumnStatsIndexHelper {
 .map(StructField::name)
 .collect(Collectors.toList());
 
-StorageConfiguration storageConf = 
HadoopFSUtils.getStorageConfWithCopy(sc.hadoopConfiguration());
+SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sc.hadoopConfiguration());
 int numParallelism = (baseFilesPaths.size() / 3 + 1);
 
 String previousJobDescription = 
sc.getLocalProperty("spark.job.description");
@@ -178,7 +177,7 @@ public class ColumnStatsIndexHelper {
 Iterable iterable = () -> paths;
 return StreamSupport.stream(iterable.spliterator(), false)
 .flatMap(path -> {
-  HoodieStorage storage = new HoodieHadoopStorage(path, 
storageConf);
+  HoodieStorage storage = new HoodieHadoopStorage(path, 
serializableConfiguration.value());
   return utils.readColumnStatsFromMetadata(
   storage,
   new StoragePath(path),
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
index bf6e222b763..59fde4af02f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -49,6 +48,7 @@ import 
org.apache.spark.sql.execution.datasources.{DataSourceUtils, Part

(hudi) branch master updated (014f2d789dd -> 4a530837f1b)

2024-05-29 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from 014f2d789dd [MINOR] Archive operation only releases lock when holds it 
(#11160)
 add 4a530837f1b [HUDI-7809] Use Spark SerializableConfiguration to avoid 
NPE in Kryo serde (#11355)

No new revisions were added by this update.

Summary of changes:
 .../HoodieFileGroupReaderBasedParquetFileFormat.scala| 12 +++-
 .../test/java/org/apache/hudi/ColumnStatsIndexHelper.java|  7 +++
 .../parquet/Spark30LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark31LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark32LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark33LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark34LegacyHoodieParquetFileFormat.scala   | 12 ++--
 .../parquet/Spark35LegacyHoodieParquetFileFormat.scala   | 12 ++--
 8 files changed, 46 insertions(+), 45 deletions(-)



[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Updated] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete

2024-05-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7655:
--
Fix Version/s: 1.0.0

> Support configuration for clean to fail execution if there is at least one 
> file is marked as a failed delete
> 
>
> Key: HUDI-7655
> URL: https://issues.apache.org/jira/browse/HUDI-7655
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>    Assignee: sivabalan narayanan
>Priority: Minor
>  Labels: clean, pull-request-available
> Fix For: 1.0.0
>
>
> When a HUDI clean plan is executed, any targeted file that was not confirmed 
> as deleted (or non-existing) will be marked as a "failed delete". Although 
> these failed deletes will be added to `.clean` metadata, if incremental clean 
> is used then these files might not ever be picked up again as a future clean 
> plan, unless a "full-scan" clean ends up being scheduled. In addition to 
> leading to more files unnecessarily taking up storage space for longer, then 
> can lead to the following dataset consistency issue for COW datasets:
>  # Insert at C1 creates file group f1 in partition
>  # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
>  # Any reader of partition that calls HUDI API (with or without using MDT) 
> will recognize that f1 should be ignored, as it has been replaced. This is 
> since RC2 instant file is in active timeline
>  # Some completed instants later an incremental clean is scheduled. It moves 
> the "earliest commit to retain" to an time after instant time RC2, so it 
> targets f1 for deletion. But during execution of the plan, it fails to delete 
> f1.
>  # An archive job eventually is triggered, and archives C1 and RC2. Note that 
> f1 is still in partition
> At this point, any job/query that reads the aforementioned partition directly 
> from the DFS file system calls (without directly using MDT FILES partition) 
> will consider both f1 and f2 as valid file groups, since RC2 is no longer in 
> active timeline. This is a data consistency issue, and will only be resolved 
> if a "full-scan" clean is triggered and deletes f1.
> This specific scenario can be avoided if the user can configure HUDI clean to 
> fail execution of a clean plan unless all files are confirmed as deleted (or 
> not existing in DFS already), "blocking" the clean. The next clean attempt 
> will re-execute this existing plan, since clean plans cannot be "rolled 
> back". 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table

2024-05-28 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7807:
-

 Summary: spark-sql updates for a pk less table fails w/ 
partitioned table 
 Key: HUDI-7807
 URL: https://issues.apache.org/jira/browse/HUDI-7807
 Project: Apache Hudi
  Issue Type: Bug
  Components: spark-sql
Reporter: sivabalan narayanan


quick start fails when trying to UPDATE with spark-sql for a pk less table. 

 
{code:java}
         > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D';
24/05/28 11:44:41 WARN package: Truncated the string representation of a plan 
since it was too large. This behavior can be adjusted by setting 
'spark.sql.debug.maxToStringFields'.
24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET fare 
= 25.0 WHERE rider = 'rider-D']
org.apache.hudi.exception.HoodieException: Unable to instantiate class 
org.apache.hudi.keygen.SimpleKeyGenerator
at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123)
at 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91)
at 
org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47)
at 
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218)
at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:

[jira] [Updated] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table

2024-05-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7807:
--
Fix Version/s: 0.15.0
   1.0.0

> spark-sql updates for a pk less table fails w/ partitioned table 
> -
>
> Key: HUDI-7807
> URL: https://issues.apache.org/jira/browse/HUDI-7807
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: spark-sql
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
> Fix For: 0.15.0, 1.0.0
>
>
> quick start fails when trying to UPDATE with spark-sql for a pk less table. 
>  
> {code:java}
>          > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D';
> 24/05/28 11:44:41 WARN package: Truncated the string representation of a plan 
> since it was too large. This behavior can be adjusted by setting 
> 'spark.sql.debug.maxToStringFields'.
> 24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET 
> fare = 25.0 WHERE rider = 'rider-D']
> org.apache.hudi.exception.HoodieException: Unable to instantiate class 
> org.apache.hudi.keygen.SimpleKeyGenerator
>   at 
> org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
>   at 
> org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123)
>   at 
> org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91)
>   at 
> org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47)
>   at 
> org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218)
>   at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
>   at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
>   at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
>   at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>   at 
> org.apache.spark.sql.exe

[jira] [Assigned] (HUDI-7807) spark-sql updates for a pk less table fails w/ partitioned table

2024-05-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7807:
-

Assignee: sivabalan narayanan

> spark-sql updates for a pk less table fails w/ partitioned table 
> -
>
> Key: HUDI-7807
> URL: https://issues.apache.org/jira/browse/HUDI-7807
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: spark-sql
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> quick start fails when trying to UPDATE with spark-sql for a pk less table. 
>  
> {code:java}
>          > UPDATE hudi_table4 SET fare = 25.0 WHERE rider = 'rider-D';
> 24/05/28 11:44:41 WARN package: Truncated the string representation of a plan 
> since it was too large. This behavior can be adjusted by setting 
> 'spark.sql.debug.maxToStringFields'.
> 24/05/28 11:44:41 ERROR SparkSQLDriver: Failed in [UPDATE hudi_table4 SET 
> fare = 25.0 WHERE rider = 'rider-D']
> org.apache.hudi.exception.HoodieException: Unable to instantiate class 
> org.apache.hudi.keygen.SimpleKeyGenerator
>   at 
> org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
>   at 
> org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:123)
>   at 
> org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:91)
>   at 
> org.apache.hudi.util.SparkKeyGenUtils$.getPartitionColumns(SparkKeyGenUtils.scala:47)
>   at 
> org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:218)
>   at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
>   at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
>   at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
>   at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>   at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyE

[jira] [Created] (HUDI-7800) Remove usages of instant time with HoodieRecordLocation

2024-05-25 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7800:
-

 Summary: Remove usages of instant time with HoodieRecordLocation
 Key: HUDI-7800
 URL: https://issues.apache.org/jira/browse/HUDI-7800
 Project: Apache Hudi
  Issue Type: Improvement
  Components: index
Reporter: sivabalan narayanan


HoodieRecordLocation has a reference to instant time. Strictly speaking, 
partitionpath and fileId is what matters and instant time should not matter. It 
is used in other places like hbase to account for rollbacks. Atleast equals() 
in HoodieRecordLocation can ignore accounting for instant time. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: Bump h2 in /hudi-platform-service/hudi-metaserver (#9147)

2024-05-24 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new a0d60d98976 Bump h2 in /hudi-platform-service/hudi-metaserver (#9147)
a0d60d98976 is described below

commit a0d60d98976d53958e1e04352d2b3b714c395e9c
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Fri May 24 18:23:14 2024 -0700

Bump h2 in /hudi-platform-service/hudi-metaserver (#9147)

Bumps [h2](https://github.com/h2database/h2database) from 1.4.200 to 
2.2.220.
- [Release notes](https://github.com/h2database/h2database/releases)
- 
[Commits](https://github.com/h2database/h2database/compare/version-1.4.200...version-2.2.220)

---
updated-dependencies:
- dependency-name: com.h2database:h2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 hudi-platform-service/hudi-metaserver/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-platform-service/hudi-metaserver/pom.xml 
b/hudi-platform-service/hudi-metaserver/pom.xml
index 38b25533fe5..b42a229fbd0 100644
--- a/hudi-platform-service/hudi-metaserver/pom.xml
+++ b/hudi-platform-service/hudi-metaserver/pom.xml
@@ -32,7 +32,7 @@
 
 
 ${project.parent.basedir}
-1.4.200
+2.2.220
 
 /usr/local
 docker



(hudi) branch master updated (660c6c7a5b0 -> 70a04c6c782)

2024-05-24 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from 660c6c7a5b0 [MINOR] Fix the look up warning msg (#11286)
 add 70a04c6c782 Bump h2 in /packaging/hudi-metaserver-server-bundle (#9148)

No new revisions were added by this update.

Summary of changes:
 packaging/hudi-metaserver-server-bundle/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-24 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-24 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-23 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Assigned] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-23 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7779:
-

Assignee: sivabalan narayanan

> Guarding archival to not archive unintended commits
> ---
>
> Key: HUDI-7779
> URL: https://issues.apache.org/jira/browse/HUDI-7779
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: archiving
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> Archiving commits from active timeline could lead to data consistency issues 
> on some rarest of occasions. We should come up with proper guards to ensure 
> we do not make such unintended archival. 
>  
> Major gap which we wanted to guard is:
> if someone disabled cleaner, archival should account for data consistency 
> issues and ensure it bails out.
> We have a base guarding condition, where archival will stop at the earliest 
> commit to retain based on latest clean commit metadata. But there are few 
> other scenarios that needs to be accounted for. 
>  
> a. Keeping aside replace commits, lets dive into specifics for regular 
> commits and delta commits.
> Say user configured clean commits to 4 and archival configs to 5 and 6. after 
> t10, cleaner is supposed to clean up all file versions created at or before 
> t6. Say cleaner did not run(for whatever reason for next 5 commits). 
>     Archival will certainly be guarded until earliest commit to retain based 
> on latest clean commits. 
> Corner case to consider: 
> A savepoint was added to say t3 and later removed. and still the cleaner was 
> never re-enabled. Even though archival would have been stopped at t3 (until 
> savepoint is present),but once savepoint is removed, if archival is executed, 
> it could archive commit t3. Which means, file versions tracked at t3 is still 
> not yet cleaned by the cleaner. 
> Reasoning: 
> We are good here wrt data consistency. Up until cleaner runs next time, this 
> older file versions might be exposed to the end-user. But time travel query 
> is not intended for already cleaned up commits and hence this is not an 
> issue. None of snapshot, time travel query or incremental query will run into 
> issues as they are not supposed to poll for t3. 
> At any later point, if cleaner is re-enabled, it will take care of cleaning 
> up file versions tracked at t3 commit. Just that for interim period, some 
> older file versions might still be exposed to readers. 
>  
> b. The more tricky part is when replace commits are involved. Since replace 
> commit metadata in active timeline is what ensures the replaced file groups 
> are ignored for reads, before archiving the same, cleaner is expected to 
> clean them up fully. But are there chances that this could go wrong? 
> Corner case to consider. Lets add onto above scenario, where t3 has a 
> savepoint, and t4 is a replace commit which replaced file groups tracked in 
> t3. 
> Cleaner will skip cleaning up files tracked by t3(due to the presence of 
> savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain 
> will be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
> disabled. In this state of the timeline, if archival is executed, (since 
> t3.savepoint is removed), archival might archive t3 and t4.rc.  This could 
> lead to data duplicates as both replaced file groups and new file groups from 
> t4.rc would be exposed as valid file groups. 
>  
> In other words, if we were to summarize the different scenarios: 
> i. replaced file group is never cleaned up. 
>     - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
> ii. replaced file group is cleaned up. 
>     - ECTR is > this.rc and is good to archive.
> iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
> clean up did not happen.  After savepoint is removed, and when archival is 
> executed, we should avoid archiving the rc of interest. This is the gap we 
> don't account for as of now.
>  
> We have 3 options to go about to solve this.
> Option A: 
> Let Savepoint deletion flow take care of cleaning up the files its tracking. 
> cons:
> Savepoint's responsibility is not removing any data files. So, from a single 
> user responsibility rule, this may not be right. Also, this clean up might 
> need to do what a clean planner might actually be doing. ie. build file 
> system view, understand if its supposed to be cleaned up already, and then 
> only clean up the files which are supposed to be cleaned up. For eg, if 

[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-23 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

[jira] [Commented] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-23 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849139#comment-17849139
 ] 

sivabalan narayanan commented on HUDI-7779:
---

Hey Sagar,

     I updated the Jira description w/ more details. can you check it out.

> Guarding archival to not archive unintended commits
> ---
>
> Key: HUDI-7779
> URL: https://issues.apache.org/jira/browse/HUDI-7779
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: archiving
>Reporter: sivabalan narayanan
>Priority: Major
>
> Archiving commits from active timeline could lead to data consistency issues 
> on some rarest of occasions. We should come up with proper guards to ensure 
> we do not make such unintended archival. 
>  
> Major gap which we wanted to guard is:
> if someone disabled cleaner, archival should account for data consistency 
> issues and ensure it bails out.
> We have a base guarding condition, where archival will stop at the earliest 
> commit to retain based on latest clean commit metadata. But there are few 
> other scenarios that needs to be accounted for. 
>  
> a. Keeping aside replace commits, lets dive into specifics for regular 
> commits and delta commits.
> Say user configured clean commits to 4 and archival configs to 5 and 6. after 
> t10, cleaner is supposed to clean up all file versions created at or before 
> t6. Say cleaner did not run(for whatever reason for next 5 commits). 
>     Archival will certainly be guarded until earliest commit to retain based 
> on latest clean commits. 
> Corner case to consider: 
> A savepoint was added to say t3 and later removed. and still the cleaner was 
> never re-enabled. Even though archival would have been stopped at t3 (until 
> savepoint is present),but once savepoint is removed, if archival is executed, 
> it could archive commit t3. Which means, file versions tracked at t3 is still 
> not yet cleaned by the cleaner. 
> Reasoning: 
> We are good here wrt data consistency. Up until cleaner runs next time, this 
> older file versions might be exposed to the end-user. But time travel query 
> is not intended for already cleaned up commits and hence this is not an 
> issue. None of snapshot, time travel query or incremental query will run into 
> issues as they are not supposed to poll for t3. 
> At any later point, if cleaner is re-enabled, it will take care of cleaning 
> up file versions tracked at t3 commit. Just that for interim period, some 
> older file versions might still be exposed to readers. 
>  
> b. The more tricky part is when replace commits are involved. Since replace 
> commit metadata in active timeline is what ensures the replaced file groups 
> are ignored for reads, before archiving the same, cleaner is expected to 
> clean them up fully. But are there chances that this could go wrong? 
> Corner case to consider. Lets add onto above scenario, where t3 has a 
> savepoint, and t4 is a replace commit which replaced file groups tracked in 
> t3. 
> Cleaner will skip cleaning up files tracked by t3(due to the presence of 
> savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain 
> will be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
> disabled. In this state of the timeline, if archival is executed, (since 
> t3.savepoint is removed), archival might archive t3 and t4.rc.  This could 
> lead to data duplicates as both replaced file groups and new file groups from 
> t4.rc would be exposed as valid file groups. 
>  
> In other words, if we were to summarize the different scenarios: 
> i. replaced file group is never cleaned up. 
>     - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
> ii. replaced file group is cleaned up. 
>     - ECTR is > this.rc and is good to archive.
> iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
> clean up did not happen.  After savepoint is removed, and when archival is 
> executed, we should avoid archiving the rc of interest. This is the gap we 
> don't account for as of now.
>  
> We have 3 options to go about to solve this.
> Option A: 
> Let Savepoint deletion flow take care of cleaning up the files its tracking. 
> cons:
> Savepoint's responsibility is not removing any data files. So, from a single 
> user responsibility rule, this may not be right. Also, this clean up might 
> need to do what a clean planner might actually be doing. ie. build file 
> system view, understand if its supposed to be cleaned up already, and then 
> only clean up the files which are suppo

[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-23 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for already cleaned up commits and hence this is not an issue. 
None of snapshot, time travel query or incremental query will run into issues 
as they are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. Just that for interim period, some older 
file versions might still be exposed to readers. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3(due to the presence of 
savepoint), but will clean up t4, t5 and t6. So, earliest commit to retain will 
be pointing to t6. And say savepoint for t3 is removed, but cleaner was 
disabled. In this state of the timeline, if archival is executed, (since 
t3.savepoint is removed), archival might archive t3 and t4.rc.  This could lead 
to data duplicates as both replaced file groups and new file groups from t4.rc 
would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 3 options to go about to solve this.

Option A: 

Let Savepoint deletion flow take care of cleaning up the files its tracking. 

cons:

Savepoint's responsibility is not removing any data files. So, from a single 
user responsibility rule, this may not be right. Also, this clean up might need 
to do what a clean planner might actually be doing. ie. build file system view, 
understand if its supposed to be cleaned up already, and then only clean up the 
files which are supposed to be cleaned up. For eg, if a file group has only one 
file slice, it should not be cleaned up and scenarios like this. 

 

Option B:

Since archival is the one which might cause data consistency issues, why not 
archival do the clean up. 

We need to account for concurrent cleans, failure and retry scenarios etc. 
Also, we might need to build the file system view and then take a call whether 
something needs to be cleaned up before archiving something. 

Cons:

Again, the single user responsibility rule might be broken. Would be neat if 
cleaner takes care of deleting data files and archival only takes care of 
deleting/archiving timeline files. 

 

Option C:

Similar to how cleaner maintain EarliestCommitToRetain, let cleaner track 
another metadata named "EarliestCommitToArchive". Strictly speaking, ear

(hudi) branch branch-0.x updated: [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270)

2024-05-22 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
 new 2e39b41be07 [HUDI-7784] Fix serde of HoodieHadoopConfiguration in 
Spark (#11270)
2e39b41be07 is described below

commit 2e39b41be07d42c0d41fd2cf765732e592954466
Author: Y Ethan Guo 
AuthorDate: Wed May 22 15:27:48 2024 -0700

[HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11270)
---
 .../apache/spark/HoodieSparkKryoRegistrar.scala|  6 +-
 .../apache/spark/TestHoodieSparkKryoRegistrar.java | 86 ++
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index a8650e5668a..eba3999ea57 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
 import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.storage.StorageConfiguration
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import com.esotericsoftware.kryo.serializers.JavaSerializer
@@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends 
HoodieCommonKryoRegistrar with KryoRegist
 //   Hadoop's configuration is not a serializable object by itself, 
and hence
 //   we're relying on [[SerializableConfiguration]] wrapper to work it 
around.
 //   We cannot remove this entry; otherwise the ordering is changed.
-//   So we replace it with [[StorageConfiguration]].
-kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
+//   So we replace it with [[HadoopStorageConfiguration]] for Spark.
+kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
new file mode 100644
index 000..4dd297a02b6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieSparkKryoRegistrar}
+ */
+public class TestHoodieSparkKryoRegistrar {
+  @Test
+  public void testSerdeHoodieHadoopConfiguration() {
+Kryo kryo = newKryo();
+
+HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new 
Configuration());
+
+// Serialize
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+Output output = new Output(baos);
+kryo.writeObject(output, conf);
+output.close();
+
+// Deserialize
+ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+Input input = new Input(bais);
+HadoopStorageConfiguration deserialized = kryo.readObject(input, 
HadoopStorageConfiguration.class);
+input.close();
+
+// Verify
+assertEquals(getPropsInMap(conf), getPropsInMap(deserialized));
+  }
+
+  private Kryo newKryo() {

(hudi) branch master updated: [HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11269)

2024-05-22 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new e1aa1bcb4af [HUDI-7784] Fix serde of HoodieHadoopConfiguration in 
Spark (#11269)
e1aa1bcb4af is described below

commit e1aa1bcb4af5cbcad9e985d0333eb5275e128e5b
Author: Y Ethan Guo 
AuthorDate: Wed May 22 15:27:03 2024 -0700

[HUDI-7784] Fix serde of HoodieHadoopConfiguration in Spark (#11269)
---
 .../apache/spark/HoodieSparkKryoRegistrar.scala|  6 +-
 .../apache/spark/TestHoodieSparkKryoRegistrar.java | 86 ++
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index a8650e5668a..eba3999ea57 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
 import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.storage.StorageConfiguration
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import com.esotericsoftware.kryo.serializers.JavaSerializer
@@ -64,8 +64,8 @@ class HoodieSparkKryoRegistrar extends 
HoodieCommonKryoRegistrar with KryoRegist
 //   Hadoop's configuration is not a serializable object by itself, 
and hence
 //   we're relying on [[SerializableConfiguration]] wrapper to work it 
around.
 //   We cannot remove this entry; otherwise the ordering is changed.
-//   So we replace it with [[StorageConfiguration]].
-kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
+//   So we replace it with [[HadoopStorageConfiguration]] for Spark.
+kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
new file mode 100644
index 000..4dd297a02b6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/TestHoodieSparkKryoRegistrar.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieSparkKryoRegistrar}
+ */
+public class TestHoodieSparkKryoRegistrar {
+  @Test
+  public void testSerdeHoodieHadoopConfiguration() {
+Kryo kryo = newKryo();
+
+HadoopStorageConfiguration conf = new HadoopStorageConfiguration(new 
Configuration());
+
+// Serialize
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+Output output = new Output(baos);
+kryo.writeObject(output, conf);
+output.close();
+
+// Deserialize
+ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+Input input = new Input(bais);
+HadoopStorageConfiguration deserialized = kryo.readObject(input, 
HadoopStorageConfiguration.class);
+input.close();
+
+// Verify
+assertEquals(getPropsInMap(conf), getPropsInMap(deserialized));
+  }
+
+  private Kryo newKryo() {
+

[jira] [Created] (HUDI-7780) Avoid 0 record parquet files

2024-05-19 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7780:
-

 Summary: Avoid 0 record parquet files
 Key: HUDI-7780
 URL: https://issues.apache.org/jira/browse/HUDI-7780
 Project: Apache Hudi
  Issue Type: Improvement
  Components: writer-core
Reporter: sivabalan narayanan


There are occasions where hudi could produce 0 record files. 

For eg,

a. entire set of records deleted in log block and due to small file handling, a 
new parquet is created with HoodieMergeHandle. 

b. During compaction, again there are chances that hudi might produce 0 record 
parquet files. 

We need to avoid such files if possible. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-18 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7779:
--
Description: 
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for on-clean commits and hence this is not an issue. None of 
snapshot, time travel query or incremental query will run into issues as they 
are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3, but will clean up t4, t5 and 
t6. So, earliest commit to retain will be pointing to t6. And say savepoint for 
t3 is removed, but cleaner was disabled. In this state of the timeline, if 
archival is executed, (since t3.savepoint is removed), archival might archive 
t3 and t4.rc.  This could lead to data duplicates as both replaced file groups 
and new file groups from t4.rc would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 

i. replaced file group is never cleaned up. 
    - ECTR(Earliest commit to retain) is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 2 options to go about to solve this.

*Option A:* 

Before archiving any replace commit by the archiver, lets explicitly check that 
all replaced file groups are fully deleted. 

Cons: Might need FileSystemView polling which might be costly. 

*OptionB:*

Cleaner also tracks an additional metadata named, "fully cleaned up file 
groups" at the end of clean planning and in completed clean commit metadata. 

So, archival instead of polling FileSystemView (which might be costly), it can 
check for clean commit metadata for the list of file groups and can deduce if 
all file groups replaced by X.rc is fully deleted. 

Pros: 

Since clean planner anyways polls the file system view and has all file group 
info already, no additional work might be required to deduce "fully cleaned up 
file groups". Just that it needs to add an additional metadata. 

 

 

 

 

 

 

 

 

  was:
Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for

[jira] [Created] (HUDI-7779) Guarding archival to not archive unintended commits

2024-05-18 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7779:
-

 Summary: Guarding archival to not archive unintended commits
 Key: HUDI-7779
 URL: https://issues.apache.org/jira/browse/HUDI-7779
 Project: Apache Hudi
  Issue Type: Bug
  Components: archiving
Reporter: sivabalan narayanan


Archiving commits from active timeline could lead to data consistency issues on 
some rarest of occasions. We should come up with proper guards to ensure we do 
not make such unintended archival. 

 

Major gap which we wanted to guard is:

if someone disabled cleaner, archival should account for data consistency 
issues and ensure it bails out.

We have a base guarding condition, where archival will stop at the earliest 
commit to retain based on latest clean commit metadata. But there are few other 
scenarios that needs to be accounted for. 

 

a. Keeping aside replace commits, lets dive into specifics for regular commits 
and delta commits.

Say user configured clean commits to 4 and archival configs to 5 and 6. after 
t10, cleaner is supposed to clean up all file versions created at or before t6. 
Say cleaner did not run(for whatever reason for next 5 commits). 

    Archival will certainly be guarded until earliest commit to retain based on 
latest clean commits. 

Corner case to consider: 

A savepoint was added to say t3 and later removed. and still the cleaner was 
never re-enabled. Even though archival would have been stopped at t3 (until 
savepoint is present),but once savepoint is removed, if archival is executed, 
it could archive commit t3. Which means, file versions tracked at t3 is still 
not yet cleaned by the cleaner. 

Reasoning: 

We are good here wrt data consistency. Up until cleaner runs next time, this 
older file versions might be exposed to the end-user. But time travel query is 
not intended for on-clean commits and hence this is not an issue. None of 
snapshot, time travel query or incremental query will run into issues as they 
are not supposed to poll for t3. 

At any later point, if cleaner is re-enabled, it will take care of cleaning up 
file versions tracked at t3 commit. 

 

b. The more tricky part is when replace commits are involved. Since replace 
commit metadata in active timeline is what ensures the replaced file groups are 
ignored for reads, before archiving the same, cleaner is expected to clean them 
up fully. But are there chances that this could go wrong? 

Corner case to consider. Lets add onto above scenario, where t3 has a 
savepoint, and t4 is a replace commit which replaced file groups tracked in t3. 

Cleaner will skip cleaning up files tracked by t3, but will clean up t4, t5 and 
t6. So, earliest commit to retain will be pointing to t6. And say savepoint for 
t3 is removed, but cleaner was disabled. In this state of the timeline, if 
archival is executed, (since t3.savepoint is removed), archival might archive 
t3 and t4.rc.  This could lead to data duplicates as both replaced file groups 
and new file groups from t4.rc would be exposed as valid file groups. 

 

In other words, if we were to summarize the different scenarios: 


i. replaced file group is never cleaned up. 
    - ECTR is less than this.rc and we are good. 
ii. replaced file group is cleaned up. 
    - ECTR is > this.rc and is good to archive.
iii. tricky: ECTR moved ahead compared to this.rc, but due to savepoint, full 
clean up did not happen.  After savepoint is removed, and when archival is 
executed, we should avoid archiving the rc of interest. This is the gap we 
don't account for as of now.

 

We have 2 options to go about to solve this.

*Option A:* 

Before archiving any replace commit by the archiver, lets explicitly check that 
all replaced file groups are fully deleted. 

Cons: Might need FileSystemView polling which might be costly. 

*OptionB:*

Cleaner also tracks an additional metadata named, "fully cleaned up file 
groups" at the end of clean planning and in completed clean commit metadata. 

So, archival instead of polling FileSystemView (which might be costly), it can 
check for clean commit metadata for the list of file groups and can deduce if 
all file groups replaced by X.rc is fully deleted. 

Pros: 

Since clean planner anyways polls the file system view and has all file group 
info already, no additional work might be required to deduce "fully cleaned up 
file groups". Just that it needs to add an additional metadata. 

 

 

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7778) Duplicate Key exception with RLI

2024-05-17 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7778:
-

 Summary: Duplicate Key exception with RLI 
 Key: HUDI-7778
 URL: https://issues.apache.org/jira/browse/HUDI-7778
 Project: Apache Hudi
  Issue Type: Bug
  Components: metadata
Reporter: sivabalan narayanan


We are occasionally hitting an exception as below meaning, two records are 
ingested to RLI for the same record key from data table. This is not expected 
to happen. 

 
{code:java}
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while 
appending records to 
file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y14gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00011.log.3_3-275-476
   at 
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475)
 at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439) 
 at 
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90)
 at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355)
  ... 28 moreCaused by: org.apache.hudi.exception.HoodieException: Writing 
multiple records with same key 1 not supported for 
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock at 
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146)
  at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121)
 at 
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166)
  at 
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467)
 ... 31 more
Driver stacktrace:51301 [main] INFO  org.apache.spark.scheduler.DAGScheduler [] 
- Job 78 failed: collect at HoodieJavaRDD.java:177, took 0.245313 s51303 [main] 
INFO  org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline service 
!!51303 [main] INFO  org.apache.hudi.client.embedded.EmbeddedTimelineService [] 
- Closing Timeline server51303 [main] INFO  
org.apache.hudi.timeline.service.TimelineService [] - Closing Timeline 
Service51321 [main] INFO  org.apache.hudi.timeline.service.TimelineService [] - 
Closed Timeline Service51321 [main] INFO  
org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline 
server
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
time 197001012
at 
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:80)
   at 
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:47)
  at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
at 
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156) 
 at 
org.apache.hudi.functional.TestGlobalIndexEnableUpdatePartitions.testUdpateSubsetOfRecUpdates(TestGlobalIndexEnableUpdatePartitions.java:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
   at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
  at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
   at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
 at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45

[jira] [Assigned] (HUDI-7778) Duplicate Key exception with RLI

2024-05-17 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7778:
-

Assignee: sivabalan narayanan

> Duplicate Key exception with RLI 
> -
>
> Key: HUDI-7778
> URL: https://issues.apache.org/jira/browse/HUDI-7778
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: metadata
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> We are occasionally hitting an exception as below meaning, two records are 
> ingested to RLI for the same record key from data table. This is not expected 
> to happen. 
>  
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while 
> appending records to 
> file:/var/folders/ym/8yjkm3n90kq8tk4gfmvk7y14gn/T/junit2792173348364470678/.hoodie/metadata/record_index/.record-index-0009-0_00011.log.3_3-275-476
>  at 
> org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:475)
>  at 
> org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:439)  
> at 
> org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:90)
>  at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:355)
>   ... 28 moreCaused by: org.apache.hudi.exception.HoodieException: 
> Writing multiple records with same key 1 not supported for 
> org.apache.hudi.common.table.log.block.HoodieHFileDataBlock at 
> org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.serializeRecords(HoodieHFileDataBlock.java:146)
>   at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:121)
>  at 
> org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:166)
>   at 
> org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:467)
>  ... 31 more
> Driver stacktrace:51301 [main] INFO  org.apache.spark.scheduler.DAGScheduler 
> [] - Job 78 failed: collect at HoodieJavaRDD.java:177, took 0.245313 s51303 
> [main] INFO  org.apache.hudi.client.BaseHoodieClient [] - Stopping Timeline 
> service !!51303 [main] INFO  
> org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closing Timeline 
> server51303 [main] INFO  org.apache.hudi.timeline.service.TimelineService [] 
> - Closing Timeline Service51321 [main] INFO  
> org.apache.hudi.timeline.service.TimelineService [] - Closed Timeline 
> Service51321 [main] INFO  
> org.apache.hudi.client.embedded.EmbeddedTimelineService [] - Closed Timeline 
> server
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 197001012
>   at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:80)
>at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:47)
>   at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
> at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
> at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:156)
>   at 
> org.apache.hudi.functional.TestGlobalIndexEnableUpdatePartitions.testUdpateSubsetOfRecUpdates(TestGlobalIndexEnableUpdatePartitions.java:225)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> 

[jira] [Assigned] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0

2024-05-15 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7771:
-

Assignee: sivabalan narayanan

> Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0
> ---
>
> Key: HUDI-7771
> URL: https://issues.apache.org/jira/browse/HUDI-7771
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: writer-core
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as 
> OverwriteWithLatestAvroPayload for 0.15.10 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0

2024-05-15 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7771:
--
Fix Version/s: 0.15.0

> Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0
> ---
>
> Key: HUDI-7771
> URL: https://issues.apache.org/jira/browse/HUDI-7771
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: writer-core
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
> Fix For: 0.15.0
>
>
> We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as 
> OverwriteWithLatestAvroPayload for 0.15.10 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7771) Make default hoodie record payload as OverwriteWithLatestPayload for 0.15.0

2024-05-15 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7771:
-

 Summary: Make default hoodie record payload as 
OverwriteWithLatestPayload for 0.15.0
 Key: HUDI-7771
 URL: https://issues.apache.org/jira/browse/HUDI-7771
 Project: Apache Hudi
  Issue Type: Improvement
  Components: writer-core
Reporter: sivabalan narayanan


We made "DefaultHoodieRecordPayload" as default for 1.x. but lets keep it as 
OverwriteWithLatestAvroPayload for 0.15.10 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) 01/01: Disabling flaky tests

2024-05-15 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch 
branch-0.x-failing-tests-test-mult-writer-archival
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ebea9b7c5152d135610bb35de89cf1d9e1ab1449
Author: sivabalan 
AuthorDate: Wed May 15 15:58:11 2024 -0700

Disabling flaky tests
---
 .../src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index e9fccfc7054..dce049ca275 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -63,6 +63,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -683,8 +684,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
 assertThrows(HoodieException.class, () -> 
metaClient.getArchivedTimeline().reload());
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {false, true})
+  @Disabled("HUDI-6386")
   public void testArchivalWithMultiWriters(boolean enableMetadata) throws 
Exception {
 HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 5, 2,
 HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,



(hudi) branch branch-0.x-failing-tests-test-mult-writer-archival created (now ebea9b7c515)

2024-05-15 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch 
branch-0.x-failing-tests-test-mult-writer-archival
in repository https://gitbox.apache.org/repos/asf/hudi.git


  at ebea9b7c515 Disabling flaky tests

This branch includes the following new commits:

 new ebea9b7c515 Disabling flaky tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[jira] [Assigned] (HUDI-7768) Fix failing tests for 0.15.0 release (async compaction and metadata num commits check)

2024-05-15 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7768:
-

Assignee: sivabalan narayanan

> Fix failing tests for 0.15.0 release (async compaction and metadata num 
> commits check)
> --
>
> Key: HUDI-7768
> URL: https://issues.apache.org/jira/browse/HUDI-7768
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: tests-ci
>    Reporter: sivabalan narayanan
>Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
>  
>  
> |[https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=600e7de6-e133-5e69-e615-50ee129b3c08=bbbd7bcc-ae73-56b8-887a-cd2d6deaafc7]
> [https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=7601efb9-4019-552e-11ba-eb31b66593b2=d4b4e11d-8e26-50e5-a0d9-bb2d5decfeb9]
> org.apache.hudi.exception.HoodieMetadataException: Metadata table's 
> deltacommits exceeded 3: this is likely caused by a pending instant in the 
> data table. Resolve the pending instant or adjust 
> `hoodie.metadata.max.deltacommits.when_pending`, then restart the pipeline. 
> at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.checkNumDeltaCommits([HoodieBackedTableMetadataWriter.java:835|http://hoodiebackedtablemetadatawriter.java:835/])
>  at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.validateTimelineBeforeSchedulingCompaction([HoodieBackedTableMetadataWriter.java:1367|http://hoodiebackedtablemetadatawriter.java:1367/])
> java.lang.IllegalArgumentException: Following instants have timestamps >= 
> compactionInstant (002) Instants 
> :[[004__deltacommit__COMPLETED__20240515123806398]] at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument([ValidationUtils.java:42|http://validationutils.java:42/])
>  at 
> org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute([ScheduleCompactionActionExecutor.java:108|http://schedulecompactionactionexecutor.java:108/])
> |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7768) Fix failing tests for 0.15.0 release (async compaction and metadata num commits check)

2024-05-15 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7768:
-

 Summary: Fix failing tests for 0.15.0 release (async compaction 
and metadata num commits check)
 Key: HUDI-7768
 URL: https://issues.apache.org/jira/browse/HUDI-7768
 Project: Apache Hudi
  Issue Type: Improvement
  Components: tests-ci
Reporter: sivabalan narayanan


 

 
|[https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=600e7de6-e133-5e69-e615-50ee129b3c08=bbbd7bcc-ae73-56b8-887a-cd2d6deaafc7]
[https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=23953=logs=7601efb9-4019-552e-11ba-eb31b66593b2=d4b4e11d-8e26-50e5-a0d9-bb2d5decfeb9]
org.apache.hudi.exception.HoodieMetadataException: Metadata table's 
deltacommits exceeded 3: this is likely caused by a pending instant in the data 
table. Resolve the pending instant or adjust 
`hoodie.metadata.max.deltacommits.when_pending`, then restart the pipeline. at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.checkNumDeltaCommits([HoodieBackedTableMetadataWriter.java:835|http://hoodiebackedtablemetadatawriter.java:835/])
 at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.validateTimelineBeforeSchedulingCompaction([HoodieBackedTableMetadataWriter.java:1367|http://hoodiebackedtablemetadatawriter.java:1367/])



java.lang.IllegalArgumentException: Following instants have timestamps >= 
compactionInstant (002) Instants 
:[[004__deltacommit__COMPLETED__20240515123806398]] at 
org.apache.hudi.common.util.ValidationUtils.checkArgument([ValidationUtils.java:42|http://validationutils.java:42/])
 at 
org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor.execute([ScheduleCompactionActionExecutor.java:108|http://schedulecompactionactionexecutor.java:108/])

|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7756) Audit all base file readers and replace w/ file slice readers

2024-05-14 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7756:
-

 Summary: Audit all base file readers and replace w/ file slice 
readers 
 Key: HUDI-7756
 URL: https://issues.apache.org/jira/browse/HUDI-7756
 Project: Apache Hudi
  Issue Type: Improvement
  Components: reader-core
Reporter: sivabalan narayanan


If file slice reader is as performant as a base file reader when there are no 
log files, we should replace all base file readers w/ file slice readers. 

Just so we unify both COW and MOR code paths



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7654] Optimizing BQ sync for MDT (#11061)

2024-05-10 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new c75ae7f9ba2 [HUDI-7654] Optimizing BQ sync for MDT (#11061)
c75ae7f9ba2 is described below

commit c75ae7f9ba21bbf9ad9101415d632ed91af27712
Author: Sivabalan Narayanan 
AuthorDate: Fri May 10 09:57:02 2024 -0700

[HUDI-7654] Optimizing BQ sync for MDT (#11061)

* Optimizing BQ sync for MDT

* Adding tests
---
 .../hudi/sync/common/util/ManifestFileWriter.java  |  51 ++---
 .../utilities/TestManifestFileWriterSpark.java | 117 +
 2 files changed, 151 insertions(+), 17 deletions(-)

diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index 2a15997ab21..3eaf80dddfe 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -19,13 +19,17 @@
 package org.apache.hudi.sync.common.util;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.fs.Path;
@@ -81,25 +85,14 @@ public class ManifestFileWriter {
 }
   }
 
+  @VisibleForTesting
   public static Stream 
fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
   boolean useFileListingFromMetadata, boolean useAbsolutePath) {
 try {
-  HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
-  HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(engContext, metaClient,
-  
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
-  
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).build());
-  Stream allLatestBaseFiles;
-  if (useFileListingFromMetadata) {
-LOG.info("Fetching all base files from MDT.");
-fsView.loadAllPartitions();
-allLatestBaseFiles = fsView.getLatestBaseFiles();
-  } else {
-List partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getStorageConf()),
-metaClient.getBasePathV2().toString(), false);
-LOG.info("Retrieve all partitions from fs: {}", partitions.size());
-allLatestBaseFiles =  
partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
-  }
-  return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath 
: HoodieBaseFile::getFileName);
+  StorageConfiguration storageConf = metaClient.getStorageConf();
+  HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(storageConf);
+  boolean canUseMetadataTable = useFileListingFromMetadata && 
metaClient.getTableConfig().isMetadataTableAvailable();
+  return getLatestBaseFiles(canUseMetadataTable, engContext, metaClient, 
useAbsolutePath);
 } catch (Exception e) {
   throw new HoodieException("Error in fetching latest base files.", e);
 }
@@ -109,6 +102,30 @@ public class ManifestFileWriter {
 return new StoragePath(metaClient.getMetaPath(), useAbsolutePath ? 
ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME);
   }
 
+  @VisibleForTesting
+  static Stream getLatestBaseFiles(boolean canUseMetadataTable, 
HoodieEngineContext engContext, HoodieTableMetaClient metaClient,
+   boolean useAbsolutePath) {
+List partitions = FSUtils.getAllPartitionPaths(engContext, 
metaClient.getBasePath(), canUseMetadataTable);
+LOG.info("Retrieve all partitions: " + partitions.size());
+HoodieTableFileSystemView fsView = null;
+try {
+  fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext, 
metaClient,
+  
HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(),
+  
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+   

[jira] [Created] (HUDI-7716) Add more logs around index lookup

2024-05-06 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7716:
-

 Summary: Add more logs around index lookup
 Key: HUDI-7716
 URL: https://issues.apache.org/jira/browse/HUDI-7716
 Project: Apache Hudi
  Issue Type: Improvement
  Components: index
Reporter: sivabalan narayanan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7712) Account for file slices instead of just base files while initializing RLI for MOR table

2024-05-05 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7712:
-

 Summary: Account for file slices instead of just base files while 
initializing RLI for MOR table
 Key: HUDI-7712
 URL: https://issues.apache.org/jira/browse/HUDI-7712
 Project: Apache Hudi
  Issue Type: Bug
  Components: metadata
Reporter: sivabalan narayanan


we could have deletes in log files. and hence we need to account for entire 
file slice instead of just base files while initializing RLI for MOR table. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7673) Enhance RLI validation w/ MDT validator for false positives

2024-05-05 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7673:
-

Assignee: sivabalan narayanan

> Enhance RLI validation w/ MDT validator for false positives
> ---
>
> Key: HUDI-7673
> URL: https://issues.apache.org/jira/browse/HUDI-7673
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> There is a chance that we could see false positive failures w/ MDT validation 
> when RLI is validated. 
>  
> When FS based record key locations are polled, we could have a pending 
> commit. and when MDT is polled or record locations, the commit could have 
> been completed. And so, rli validation could return additional record 
> locations.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7687) Instant should not be archived until replaced file groups or older file versions are deleted

2024-04-29 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7687:
-

Assignee: sivabalan narayanan

> Instant should not be archived until replaced file groups or older file 
> versions are deleted
> 
>
> Key: HUDI-7687
> URL: https://issues.apache.org/jira/browse/HUDI-7687
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>    Assignee: sivabalan narayanan
>Priority: Minor
>  Labels: archive, clean
>
> When archival runs it may consider an instant as a candidate for archival 
> even if the file groups said instant replaced/updated still need to undergo a 
> `clean`. For example, consider the following scenario with clean and archived 
> scheduled/executed independently in different jobs
>  # Insert at C1 creates file group f1 in partition
>  # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
>  # Any reader of partition that calls HUDI API (with or without using MDT) 
> will recognize that f1 should be ignored, as it has been replaced. This is 
> since RC2 instant file is in active timeline
>  # Some more instants are added to timeline. RC2 is now eligible to be 
> cleaned (as per the table writers' clean policy). Assume though that file 
> groups replaces by RC2 haven't been deleted yet, such as due to clean 
> repeatedly failing, async clean not being scheduled yet, or the clean failing 
> to delete said file groups.
>  # An archive job eventually is triggered, and archives C1 and RC2. Note that 
> f1 is still in partition
> Now the table has the same consistency issue as seen in 
> https://issues.apache.org/jira/browse/HUDI-7655 , where replaced file groups 
> are still in partition and readers may see inconsistent data. 
>  
> This situation can be avoided by ensuring that archival will "block" and no 
> go past an older instant time if it sees that said instant didn't undergo a 
> clean yet. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7655) Support configuration for clean to fail execution if there is at least one file is marked as a failed delete

2024-04-29 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7655:
-

Assignee: sivabalan narayanan

> Support configuration for clean to fail execution if there is at least one 
> file is marked as a failed delete
> 
>
> Key: HUDI-7655
> URL: https://issues.apache.org/jira/browse/HUDI-7655
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>    Assignee: sivabalan narayanan
>Priority: Minor
>  Labels: clean
>
> When a HUDI clean plan is executed, any targeted file that was not confirmed 
> as deleted (or non-existing) will be marked as a "failed delete". Although 
> these failed deletes will be added to `.clean` metadata, if incremental clean 
> is used then these files might not ever be picked up again as a future clean 
> plan, unless a "full-scan" clean ends up being scheduled. In addition to 
> leading to more files unnecessarily taking up storage space for longer, then 
> can lead to the following dataset consistency issue for COW datasets:
>  # Insert at C1 creates file group f1 in partition
>  # Replacecommit at RC2 creates file group f2 in partition, and replaces f1
>  # Any reader of partition that calls HUDI API (with or without using MDT) 
> will recognize that f1 should be ignored, as it has been replaced. This is 
> since RC2 instant file is in active timeline
>  # Some completed instants later an incremental clean is scheduled. It moves 
> the "earliest commit to retain" to an time after instant time RC2, so it 
> targets f1 for deletion. But during execution of the plan, it fails to delete 
> f1.
>  # An archive job eventually is triggered, and archives C1 and RC2. Note that 
> f1 is still in partition
> At this point, any job/query that reads the aforementioned partition directly 
> from the DFS file system calls (without directly using MDT FILES partition) 
> will consider both f1 and f2 as valid file groups, since RC2 is no longer in 
> active timeline. This is a data consistency issue, and will only be resolved 
> if a "full-scan" clean is triggered and deletes f1.
> This specific scenario can be avoided if the user can configure HUDI clean to 
> fail execution of a clean plan unless all files are confirmed as deleted (or 
> not existing in DFS already), "blocking" the clean. The next clean attempt 
> will re-execute this existing plan, since clean plans cannot be "rolled 
> back". 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7673) Enhance RLI validation w/ MDT validator for false positives

2024-04-25 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7673:
-

 Summary: Enhance RLI validation w/ MDT validator for false 
positives
 Key: HUDI-7673
 URL: https://issues.apache.org/jira/browse/HUDI-7673
 Project: Apache Hudi
  Issue Type: Improvement
  Components: metadata
Reporter: sivabalan narayanan


There is a chance that we could see false positive failures w/ MDT validation 
when RLI is validated. 

 

When FS based record key locations are polled, we could have a pending commit. 
and when MDT is polled or record locations, the commit could have been 
completed. And so, rli validation could return additional record locations.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7659) Update 0.14.0 release docs to call out that row writer w/ clustering is enabled by default

2024-04-23 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7659:
-

 Summary: Update 0.14.0 release docs to call out that row writer w/ 
clustering is enabled by default
 Key: HUDI-7659
 URL: https://issues.apache.org/jira/browse/HUDI-7659
 Project: Apache Hudi
  Issue Type: Improvement
  Components: docs
Reporter: sivabalan narayanan


Update 0.14.0 release docs to call out that row writer w/ clustering is enabled 
by default

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7655] Minor fix to rli validation with MDT validator (#11060)

2024-04-21 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 0dbeb77b92a [HUDI-7655] Minor fix to rli validation with MDT validator 
(#11060)
0dbeb77b92a is described below

commit 0dbeb77b92a786ed4c4e728533e61927b93de70b
Author: Sivabalan Narayanan 
AuthorDate: Sun Apr 21 11:19:45 2024 -0700

[HUDI-7655] Minor fix to rli validation with MDT validator (#11060)
---
 .../org/apache/hudi/utilities/HoodieMetadataTableValidator.java   | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 75ff9a41fc0..0e9820a5c9b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -959,6 +959,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 int numErrorSamples = cfg.numRecordIndexErrorSamples;
 Pair> result = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
 .map(e -> {
+  String recordKey = e._1;
   Optional> locationOnFs = e._2._1;
   Optional> locationFromRecordIndex = e._2._2;
   List errorSampleList = new ArrayList<>();
@@ -967,13 +968,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 && 
locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight())) 
{
   return Pair.of(0L, errorSampleList);
 }
-errorSampleList.add(constructLocationInfoString(locationOnFs, 
locationFromRecordIndex));
+errorSampleList.add(constructLocationInfoString(recordKey, 
locationOnFs, locationFromRecordIndex));
 return Pair.of(1L, errorSampleList);
   }
   if (!locationOnFs.isPresent() && 
!locationFromRecordIndex.isPresent()) {
 return Pair.of(0L, errorSampleList);
   }
-  errorSampleList.add(constructLocationInfoString(locationOnFs, 
locationFromRecordIndex));
+  errorSampleList.add(constructLocationInfoString(recordKey, 
locationOnFs, locationFromRecordIndex));
   return Pair.of(1L, errorSampleList);
 })
 .reduce((pair1, pair2) -> {
@@ -1030,9 +1031,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 }
   }
 
-  private String constructLocationInfoString(Optional> 
locationOnFs,
+  private String constructLocationInfoString(String recordKey, 
Optional> locationOnFs,
  Optional> 
locationFromRecordIndex) {
 StringBuilder sb = new StringBuilder();
+sb.append("Record key " + recordKey + " -> ");
 sb.append("FS: ");
 if (locationOnFs.isPresent()) {
   sb.append(locationOnFs.get());



[jira] [Created] (HUDI-7645) Optimize BQ sync tool for MDT

2024-04-20 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7645:
-

 Summary: Optimize BQ sync tool for MDT
 Key: HUDI-7645
 URL: https://issues.apache.org/jira/browse/HUDI-7645
 Project: Apache Hudi
  Issue Type: Improvement
  Components: meta-sync
Reporter: sivabalan narayanan


Looks like in BQ sync, we are polling fsview for latest files sequentially for 
every partition. 

 

When MDT is enabled, we could load all partitions in one call. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7644) Add record key info with RLI validation in MDT Validator

2024-04-20 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7644:
--
Fix Version/s: 1.0.0

> Add record key info with RLI validation in MDT Validator
> 
>
> Key: HUDI-7644
> URL: https://issues.apache.org/jira/browse/HUDI-7644
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata, tests-ci
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
> Fix For: 0.15.0, 1.0.0
>
>
> Add record key info with RLI validation in MDT Validator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7644) Add record key info with RLI validation in MDT Validator

2024-04-20 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7644:
--
Fix Version/s: 0.15.0

> Add record key info with RLI validation in MDT Validator
> 
>
> Key: HUDI-7644
> URL: https://issues.apache.org/jira/browse/HUDI-7644
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata, tests-ci
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
> Fix For: 0.15.0
>
>
> Add record key info with RLI validation in MDT Validator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7644) Add record key info with RLI validation in MDT Validator

2024-04-20 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7644:
-

Assignee: sivabalan narayanan

> Add record key info with RLI validation in MDT Validator
> 
>
> Key: HUDI-7644
> URL: https://issues.apache.org/jira/browse/HUDI-7644
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata, tests-ci
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> Add record key info with RLI validation in MDT Validator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7644) Add record key info with RLI validation in MDT Validator

2024-04-20 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7644:
-

 Summary: Add record key info with RLI validation in MDT Validator
 Key: HUDI-7644
 URL: https://issues.apache.org/jira/browse/HUDI-7644
 Project: Apache Hudi
  Issue Type: Improvement
  Components: metadata, tests-ci
Reporter: sivabalan narayanan


Add record key info with RLI validation in MDT Validator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7641) Add metrics to track what partitions are enabled in MDT

2024-04-19 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7641:
--
Fix Version/s: 0.15.0

> Add metrics to track what partitions are enabled in MDT
> ---
>
> Key: HUDI-7641
> URL: https://issues.apache.org/jira/browse/HUDI-7641
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7641) Add metrics to track what partitions are enabled in MDT

2024-04-19 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7641:
-

Assignee: sivabalan narayanan

> Add metrics to track what partitions are enabled in MDT
> ---
>
> Key: HUDI-7641
> URL: https://issues.apache.org/jira/browse/HUDI-7641
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: metadata
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018)

2024-04-19 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new ca77fda51fe [HUDI-7618] Add ability to ignore checkpoints in delta 
streamer (#11018)
ca77fda51fe is described below

commit ca77fda51fe3036f86d4ddb8b0e58a2f160882dc
Author: Sampan S Nayak 
AuthorDate: Fri Apr 19 11:55:43 2024 +0530

[HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018)
---
 .../hudi/utilities/streamer/HoodieStreamer.java|  7 +++
 .../apache/hudi/utilities/streamer/StreamSync.java | 13 -
 .../streamer/TestStreamSyncUnitTests.java  | 61 ++
 3 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 59c1bf3d164..0dd488bffcb 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -428,6 +428,13 @@ public class HoodieStreamer implements Serializable {
 @Parameter(names = {"--config-hot-update-strategy-class"}, description = 
"Configuration hot update in continuous mode")
 public String configHotUpdateStrategyClass = "";
 
+@Parameter(names = {"--ignore-checkpoint"}, description = "Set this config 
with a unique value, recommend using a timestamp value or UUID."
++ " Setting this config indicates that the subsequent sync should 
ignore the last committed checkpoint for the source. The config value is stored"
++ " in the commit history, so setting the config with same values 
would not have any affect. This config can be used in scenarios like kafka 
topic change,"
++ " where we would want to start ingesting from the latest or earliest 
offset after switching the topic (in this case we would want to ignore the 
previously"
++ " committed checkpoint, and rely on other configs to pick the 
starting offsets).")
+public String ignoreCheckpoint = null;
+
 public boolean isAsyncCompactionEnabled() {
   return continuousMode && !forceDisableCompaction
   && 
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index c9521058b12..2f5bd1fd3ff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -164,6 +164,7 @@ public class StreamSync implements Serializable, Closeable {
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
   private static final String NULL_PLACEHOLDER = "[null]";
+  public static final String CHECKPOINT_IGNORE_KEY = 
"deltastreamer.checkpoint.ignore_key";
 
   /**
* Delta Sync Config.
@@ -733,7 +734,8 @@ public class StreamSync implements Serializable, Closeable {
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
-  private Option getCheckpointToResume(Option 
commitsTimelineOpt) throws IOException {
+  @VisibleForTesting
+  Option getCheckpointToResume(Option 
commitsTimelineOpt) throws IOException {
 Option resumeCheckpointStr = Option.empty();
 // try get checkpoint from commits(including commit and deltacommit)
 // in COW migrating to MOR case, the first batch of the deltastreamer will 
lost the checkpoint from COW table, cause the dataloss
@@ -750,7 +752,11 @@ public class StreamSync implements Serializable, Closeable 
{
   if (commitMetadataOption.isPresent()) {
 HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
 LOG.debug("Checkpoint reset from metadata: " + 
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
-if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+if (cfg.ignoreCheckpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY))
+|| 
!cfg.ignoreCheckpoint.equals(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY
 {
+  // we ignore any existing checkpoint and start ingesting afresh
+  resumeCheckpointStr = Option.empty();
+} else if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
 || 
!cfg.checkpoint.equals(c

[jira] [Created] (HUDI-7641) Add metrics to track what partitions are enabled in MDT

2024-04-18 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7641:
-

 Summary: Add metrics to track what partitions are enabled in MDT
 Key: HUDI-7641
 URL: https://issues.apache.org/jira/browse/HUDI-7641
 Project: Apache Hudi
  Issue Type: Improvement
  Components: metadata
Reporter: sivabalan narayanan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7624) Fix index lookup duration to track tag location duration

2024-04-16 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7624:
-

 Summary: Fix index lookup duration to track tag location duration
 Key: HUDI-7624
 URL: https://issues.apache.org/jira/browse/HUDI-7624
 Project: Apache Hudi
  Issue Type: Bug
  Components: index
Reporter: sivabalan narayanan


With spark lazy evaluation, we can't start a timer before tagLocation call and 
end the timer later. This may not give us the right value for tag location 
duration. So, we need to fix the duration properly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7606] Unpersist RDDs after table services, mainly compaction and clustering (#11000)

2024-04-14 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 48952ae5dfd [HUDI-7606] Unpersist RDDs after table services, mainly 
compaction and clustering (#11000)
48952ae5dfd is described below

commit 48952ae5dfd82f84cc338886e653cf1412e8cda8
Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com>
AuthorDate: Sun Apr 14 14:38:55 2024 -0700

[HUDI-7606] Unpersist RDDs after table services, mainly compaction and 
clustering (#11000)


-

Co-authored-by: rmahindra123 
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 12 
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../hudi/client/SparkRDDTableServiceClient.java|  6 ++
 .../apache/hudi/client/SparkRDDWriteClient.java| 21 +--
 .../hudi/client/utils/SparkReleaseResources.java   | 64 ++
 5 files changed, 85 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 22f1a9995bd..228eaf4d554 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -333,6 +333,7 @@ public abstract class BaseHoodieTableServiceClient 
extends BaseHoodieCl
   CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
 } finally {
   this.txnManager.endTransaction(Option.of(compactionInstant));
+  releaseResources(compactionCommitTime);
 }
 WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
 .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -393,6 +394,7 @@ public abstract class BaseHoodieTableServiceClient 
extends BaseHoodieCl
   CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
 } finally {
   this.txnManager.endTransaction(Option.of(logCompactionInstant));
+  releaseResources(logCompactionCommitTime);
 }
 WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionCommitTime)
 .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -534,6 +536,7 @@ public abstract class BaseHoodieTableServiceClient 
extends BaseHoodieCl
   throw new HoodieClusteringException("unable to transition clustering 
inflight to complete: " + clusteringCommitTime, e);
 } finally {
   this.txnManager.endTransaction(Option.of(clusteringInstant));
+  releaseResources(clusteringCommitTime);
 }
 WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)
 .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -779,6 +782,7 @@ public abstract class BaseHoodieTableServiceClient 
extends BaseHoodieCl
   + " Earliest Retained Instant :" + 
metadata.getEarliestCommitToRetain()
   + " cleanerElapsedMs" + durationMs);
 }
+releaseResources(cleanInstantTime);
 return metadata;
   }
 
@@ -1171,4 +1175,12 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieCl
   }
 }
   }
+
+  /**
+   * Called after each commit of a compaction or clustering table service,
+   * to release any resources used.
+   */
+  protected void releaseResources(String instantTime) {
+// do nothing here
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9ade694d340..2b9bc83091b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -236,11 +236,11 @@ public abstract class BaseHoodieWriteClient 
extends BaseHoodieClient
   commit(table, commitActionType, instantTime, metadata, stats, 
writeStatuses);
   postCommit(table, metadata, instantTime, extraMetadata);
   LOG.info("Committed " + instantTime);
-  releaseResources(instantTime);
 } catch (IOException e) {
   throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime, e);
 } finally {
   this.txnManager.endTransaction(Option.of(inflightInstant));
+  releaseResources(instantTime);
 }
 
 // trigger clean and archival.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/Spa

[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-04-03 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7507:
--
Fix Version/s: 1.0.0

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: table-service
>Reporter: Krishen Bhan
>Priority: Major
> Fix For: 0.15.0, 1.0.0
>
> Attachments: Flowchart (1).png, Flowchart.png
>
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart (1).png!
>  
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> -An alternate approach (suggested by- [~pwason] -) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.-
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline (inflight or otherwise) that are greater than it. If that assertion 
> fails, then throw a retry-able conflict resolution exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-04-03 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7507:
--
Fix Version/s: 0.15.0

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: table-service
>Reporter: Krishen Bhan
>Priority: Major
> Fix For: 0.15.0
>
> Attachments: Flowchart (1).png, Flowchart.png
>
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart (1).png!
>  
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> -An alternate approach (suggested by- [~pwason] -) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.-
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline (inflight or otherwise) that are greater than it. If that assertion 
> fails, then throw a retry-able conflict resolution exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (HUDI-4699) Primary key-less data model

2024-04-01 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan resolved HUDI-4699.
---

> Primary key-less data model
> ---
>
> Key: HUDI-4699
> URL: https://issues.apache.org/jira/browse/HUDI-4699
> Project: Apache Hudi
>  Issue Type: Epic
>  Components: writer-core
>Reporter: Sagar Sumit
>Priority: Major
>  Labels: pull-request-available
>
> Hudi requires users to specify a primary key field. Can we do away with this 
> requirement? This epic tracks the work to support use cases which does not 
> require primary key based data modelling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (HUDI-4699) Primary key-less data model

2024-04-01 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan closed HUDI-4699.
-
Fix Version/s: 0.14.0
   Resolution: Fixed

> Primary key-less data model
> ---
>
> Key: HUDI-4699
> URL: https://issues.apache.org/jira/browse/HUDI-4699
> Project: Apache Hudi
>  Issue Type: Epic
>  Components: writer-core
>Reporter: Sagar Sumit
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.14.0
>
>
> Hudi requires users to specify a primary key field. Can we do away with this 
> requirement? This epic tracks the work to support use cases which does not 
> require primary key based data modelling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (HUDI-4699) Primary key-less data model

2024-04-01 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-4699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reopened HUDI-4699:
---
Assignee: sivabalan narayanan

> Primary key-less data model
> ---
>
> Key: HUDI-4699
> URL: https://issues.apache.org/jira/browse/HUDI-4699
> Project: Apache Hudi
>  Issue Type: Epic
>  Components: writer-core
>Reporter: Sagar Sumit
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> Hudi requires users to specify a primary key field. Can we do away with this 
> requirement? This epic tracks the work to support use cases which does not 
> require primary key based data modelling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7557] Fix incremental cleaner when commit for savepoint removed (#10946)

2024-04-01 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 9efced37f81 [HUDI-7557] Fix incremental cleaner when commit for 
savepoint removed (#10946)
9efced37f81 is described below

commit 9efced37f819ae59b51099ee43dc75e1a876a855
Author: Sagar Sumit 
AuthorDate: Mon Apr 1 23:00:19 2024 +0530

[HUDI-7557] Fix incremental cleaner when commit for savepoint removed 
(#10946)
---
 .../hudi/table/action/clean/CleanPlanner.java  |  1 +
 .../apache/hudi/table/action/TestCleanPlanner.java | 89 --
 2 files changed, 51 insertions(+), 39 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 48ec8f9baa1..753f8c8253d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -245,6 +245,7 @@ public class CleanPlanner implements 
Serializable {
   Option instantOption = 
hoodieTable.getCompletedCommitsTimeline().filter(instant -> 
instant.getTimestamp().equals(savepointCommit)).firstInstant();
   if (!instantOption.isPresent()) {
 LOG.warn("Skipping to process a commit for which savepoint was removed 
as the instant moved to archived timeline already");
+return Stream.empty();
   }
   HoodieInstant instant = instantOption.get();
   return getPartitionsForInstants(instant);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index 8052572fcea..9989273b723 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -138,14 +138,14 @@ public class TestCleanPlanner {
   void testPartitionsForIncrCleaning(HoodieWriteConfig config, String 
earliestInstant,
  String lastCompletedTimeInLastClean, 
String lastCleanInstant, String earliestInstantsInLastClean, List 
partitionsInLastClean,
  Map> 
savepointsTrackedInLastClean, Map> 
activeInstantsPartitions,
- Map> savepoints, 
List expectedPartitions) throws IOException {
+ Map> savepoints, 
List expectedPartitions, boolean areCommitsForSavepointsRemoved) throws 
IOException {
 HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
 when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
 // setup savepoint mocks
 Set savepointTimestamps = 
savepoints.keySet().stream().collect(Collectors.toSet());
 
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
 if (!savepoints.isEmpty()) {
-  for (Map.Entry> entry: savepoints.entrySet()) {
+  for (Map.Entry> entry : savepoints.entrySet()) {
 Pair> 
savepointMetadataOptionPair = getSavepointMetadata(entry.getValue());
 HoodieInstant instant = new HoodieInstant(false, 
HoodieTimeline.SAVEPOINT_ACTION, entry.getKey());
 
when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight());
@@ -156,7 +156,7 @@ public class TestCleanPlanner {
 Pair> cleanMetadataOptionPair =
 getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, 
earliestInstantsInLastClean, lastCompletedTimeInLastClean, 
savepointsTrackedInLastClean.keySet());
 mockLastCleanCommit(mockHoodieTable, lastCleanInstant, 
earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
-mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, 
savepointsTrackedInLastClean);
+mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, 
savepointsTrackedInLastClean, areCommitsForSavepointsRemoved);
 
 // Trigger clean and validate partitions to clean.
 CleanPlanner cleanPlanner = new CleanPlanner<>(context, 
mockHoodieTable, config);
@@ -332,7 +332,7 @@ public class TestCleanPlanner {
 
   static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() 
{
 String earliestInstant = "20231204194919610";
-String earliestInstantPlusTwoDays =  "20231206194919610";
+String earliestInstantPlusTwoDays = "20231206194919610";
 String lastCleanInstant = earliestInstantPlusTwoDays;
 String earliestInstantMinusThreeDays = "20231201194919610";
 String e

[jira] [Created] (HUDI-7556) Fix MDT validator to account for additional partitions in MDT

2024-03-29 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7556:
-

 Summary: Fix MDT validator to account for additional partitions in 
MDT
 Key: HUDI-7556
 URL: https://issues.apache.org/jira/browse/HUDI-7556
 Project: Apache Hudi
  Issue Type: Bug
  Components: metadata
Reporter: sivabalan narayanan


There is a chance that MDT could list additional partitions when compared to FS 
based listing. 

reason is: 

We load active timeline from metaclient and poll FS based listing for completed 
commits. And then we poll MDT for list of all partitions. in between these two, 
there could be a commit that could have been completed and hence MDT could be 
serving that as well. So, lets account for that in our validation tool 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7500] fix gaps with deduce schema and null schema (#10858)

2024-03-27 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 136d0755ad7 [HUDI-7500] fix gaps with deduce schema and null schema 
(#10858)
136d0755ad7 is described below

commit 136d0755ad7f86d2f4b7b4813a59096c233055e7
Author: Jon Vexler 
AuthorDate: Wed Mar 27 17:27:27 2024 -0400

[HUDI-7500] fix gaps with deduce schema and null schema (#10858)


-

Co-authored-by: Jonathan Vexler <=>
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |   7 +-
 .../utilities/streamer/SourceFormatAdapter.java|   2 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  51 --
 .../deltastreamer/TestHoodieDeltaStreamer.java |   4 +-
 .../streamer/TestStreamSyncUnitTests.java  | 192 +
 5 files changed, 241 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c346f7665df..be3d2f4ed4b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -74,7 +74,12 @@ class DefaultSource extends RelationProvider
   override def createRelation(sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = 
{
 try {
-  createRelation(sqlContext, parameters, null)
+  val relation = createRelation(sqlContext, parameters, null)
+  if (relation.schema.isEmpty) {
+new EmptyRelation(sqlContext, new StructType())
+  } else {
+relation
+  }
 } catch {
   case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, 
new StructType())
   case e => throw e
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index f29404701db..1796c96dab8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -62,7 +62,7 @@ import static 
org.apache.hudi.utilities.streamer.BaseErrorTableWriter.ERROR_TABL
 /**
  * Adapts data-format provided by the source to the data-format required by 
the client (DeltaStreamer).
  */
-public final class SourceFormatAdapter implements Closeable {
+public class SourceFormatAdapter implements Closeable {
 
   private final Source source;
   private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1453e9fd07c..ded5348ed8f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -255,6 +256,31 @@ public class StreamSync implements Serializable, Closeable 
{
 
   private final boolean useRowWriter;
 
+  @VisibleForTesting
+  StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
+ TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
+ Function 
onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
+ Option errorTableWriter, 
SourceFormatAdapter formatAdapter, Option transformer,
+ boolean useRowWriter, boolean autoGenerateRecordKeys) {
+this.cfg = cfg;
+this.hoodieSparkContext = hoodieSparkContext;
+this.sparkSession = sparkSession;
+this.fs = fs;
+this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
+this.props = props;
+this.userProvidedSchemaProvider = userProvidedSchemaProvider;
+this.processedSchema = new SchemaSet();
+this.autoGenerateRecordKeys = autoGenerateRecordKeys;
+this.keyGenClassName = getKeyGeneratorClassName(new 
TypedProperties(props));
+this.conf = conf;
+
+this.errorTableWriter = errorTableWriter;
+this.formatAdapter = formatAdapter;
+this.transformer = transformer;
+this.

(hudi) branch master updated: [HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes (#10913)

2024-03-26 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 8a137631da8 [HUDI-7518] Fix HoodieMetadataPayload merging logic around 
repeated deletes (#10913)
8a137631da8 is described below

commit 8a137631da86c0ee99cf847b317484414ca4b7a2
Author: Y Ethan Guo 
AuthorDate: Tue Mar 26 19:13:09 2024 -0700

[HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes 
(#10913)
---
 .../common/testutils/HoodieMetadataTestTable.java  |  11 ++
 .../functional/TestHoodieBackedTableMetadata.java  | 126 -
 .../hudi/metadata/HoodieMetadataPayload.java   |  53 +
 .../hudi/common/testutils/HoodieTestTable.java |  13 +++
 .../hudi/metadata/TestHoodieMetadataPayload.java   |  87 --
 5 files changed, 254 insertions(+), 36 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 3bcba72eb68..b1974ff3e4e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.testutils;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -134,6 +135,16 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
 return cleanMetadata;
   }
 
+  @Override
+  public void repeatClean(String cleanCommitTime,
+  HoodieCleanerPlan cleanerPlan,
+  HoodieCleanMetadata cleanMetadata) throws 
IOException {
+super.repeatClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+if (writer != null) {
+  writer.update(cleanMetadata, cleanCommitTime);
+}
+  }
+
   public HoodieTestTable addCompaction(String instantTime, 
HoodieCommitMetadata commitMetadata) throws Exception {
 super.addCompaction(instantTime, commitMetadata);
 if (writer != null) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 1a268675ac7..8ca0d4e16a9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -19,10 +19,14 @@
 package org.apache.hudi.client.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -32,8 +36,12 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +51,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
-import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieSparkTable;
 import

[jira] [Created] (HUDI-7549) Data inconsistency issue w/ spurious log block detection

2024-03-25 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7549:
-

 Summary: Data inconsistency issue w/ spurious log block detection
 Key: HUDI-7549
 URL: https://issues.apache.org/jira/browse/HUDI-7549
 Project: Apache Hudi
  Issue Type: Bug
  Components: reader-core
Reporter: sivabalan narayanan


We added support to deduce spurious log blocks with log block reader 

[https://github.com/apache/hudi/pull/9545]

[https://github.com/apache/hudi/pull/9611] 

in 0.14.0. 

Aparrently there are some cases where it could lead to data loss or data 
consistency issues. 

[https://github.com/apache/hudi/pull/9611#issuecomment-2016687160] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7532) Fix schedule compact to only consider DCs after last compaction commit

2024-03-22 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7532:
-

 Summary: Fix schedule compact to only consider DCs after last 
compaction commit 
 Key: HUDI-7532
 URL: https://issues.apache.org/jira/browse/HUDI-7532
 Project: Apache Hudi
  Issue Type: Bug
  Components: compaction
Reporter: sivabalan narayanan


Fix schedule compact to only consider DCs after last compaction commit. As of 
now, it also considers replace commit. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7528) Fix RowCustomColumnsSortPartitioner to use repartition instead of coalesce

2024-03-22 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7528:
-

 Summary: Fix RowCustomColumnsSortPartitioner to use repartition 
instead of coalesce
 Key: HUDI-7528
 URL: https://issues.apache.org/jira/browse/HUDI-7528
 Project: Apache Hudi
  Issue Type: Bug
  Components: writer-core
Reporter: sivabalan narayanan


Fix RowCustomColumnsSortPartitioner to use repartition instead of coalesce



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [MINOR] Remove redundant fileId from HoodieAppendHandle (#10901)

2024-03-21 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 135db099afc [MINOR] Remove redundant fileId from HoodieAppendHandle 
(#10901)
135db099afc is described below

commit 135db099afc99ad12c758c5428233614cf14a49a
Author: wombatu-kun 
AuthorDate: Fri Mar 22 03:12:01 2024 +0700

[MINOR] Remove redundant fileId from HoodieAppendHandle (#10901)

Co-authored-by: Vova Kolmakov 
---
 .../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java| 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8d0eb2305e6..93df86e170d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -89,7 +89,6 @@ public class HoodieAppendHandle extends 
HoodieWriteHandle recordList = new ArrayList<>();
@@ -158,7 +157,6 @@ public class HoodieAppendHandle extends 
HoodieWriteHandle();



[jira] [Created] (HUDI-7526) Fix constructors for all bulk insert sort partitioners to ensure we could use it as user defined partitioners

2024-03-21 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7526:
-

 Summary: Fix constructors for all bulk insert sort partitioners to 
ensure we could use it as user defined partitioners 
 Key: HUDI-7526
 URL: https://issues.apache.org/jira/browse/HUDI-7526
 Project: Apache Hudi
  Issue Type: Bug
  Components: writer-core
Reporter: sivabalan narayanan


Our constructor for user defined sort partitioner takes in write config, while 
some of the partitioners used in out of the box sort mode, does not account for 
it. 

 

Lets fix the sort partitioners to ensure anything can be used as user defined 
partitioners. 

For eg, NoneSortMode does not have a constructor that takes in write config 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-18 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828123#comment-17828123
 ] 

sivabalan narayanan edited comment on HUDI-7507 at 3/19/24 1:12 AM:


Just trying to replay the same scenario for data table, conflict resolution 
could have aborted job2. and hence we may not hit the same issue. 


was (Author: shivnarayan):
Just trying to replay the same scenario for data table, conflict resolution 
could have aborted job2. and hence we may not hit the same issue. 

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Major
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> An alternate approach (suggested by [~pwason] ) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-18 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828142#comment-17828142
 ] 

sivabalan narayanan commented on HUDI-7507:
---

We have already fixed it w/ latest master (1.0) by generating the new commit 
times using locks. That should solve the issue. We can apply the same to 0.X 
branch. 

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Major
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> An alternate approach (suggested by [~pwason] ) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (HUDI-7507) ongoing concurrent writers with smaller timestamp can cause issues with table services

2024-03-18 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828123#comment-17828123
 ] 

sivabalan narayanan commented on HUDI-7507:
---

Just trying to replay the same scenario for data table, conflict resolution 
could have aborted job2. and hence we may not hit the same issue. 

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Krishen Bhan
>Priority: Major
>
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately this has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan, even 
> if its an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this
> An alternate approach (suggested by [~pwason] ) was to instead have all 
> operations including table services perform conflict resolution checks before 
> committing. For example, clean and compaction would generate their plan as 
> usual. But when creating a transaction to write a .requested file, right 
> before creating the file they should check if another lower timestamp instant 
> has appeared in the timeline. And if so, they should fail/abort without 
> creating the plan. Commit operations would also be updated/verified to have 
> similar check, before creating a .requested file (during a transaction) the 
> commit operation will check if a table service plan (clean/compact) with a 
> greater instant time has been created. And if so, would abort/fail. This 
> avoids the drawbacks of the first approach, but will lead to more transient 
> failures that users have to handle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7511) Offset range calculation in kafka should return all topic partitions

2024-03-17 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7511:
-

 Summary: Offset range calculation in kafka should return all topic 
partitions 
 Key: HUDI-7511
 URL: https://issues.apache.org/jira/browse/HUDI-7511
 Project: Apache Hudi
  Issue Type: Bug
  Components: deltastreamer
Reporter: sivabalan narayanan


after [https://github.com/apache/hudi/pull/10869] got landed, we are not 
returning every topic partition in final ranges. But for checkpointing purpose, 
we need to have every kafka topic partition in final ranges even if we are not 
consuming anything. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated (ca2140e2003 -> 3c8488b831c)

2024-03-15 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from ca2140e2003 [MINOR] rename KeyGenUtils#enableAutoGenerateRecordKeys 
(#10871)
 add 3c8488b831c [HUDI-7506] Compute offsetRanges based on 
eventsPerPartition allocated in each range (#10869)

No new revisions were added by this update.

Summary of changes:
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  88 ++-
 .../sources/helpers/TestCheckpointUtils.java   | 167 -
 .../sources/helpers/TestKafkaOffsetGen.java|  10 +-
 3 files changed, 179 insertions(+), 86 deletions(-)



(hudi) branch master updated (2af83e2d9a8 -> dc349f5293f)

2024-03-08 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit 
(#10676)
 add dc349f5293f [ENG-6316] Bump cleaner retention for MDT (#537) (#10655)

No new revisions were added by this update.

Summary of changes:
 .../hudi/metadata/HoodieMetadataWriteUtils.java| 28 +++---
 .../metadata/TestHoodieMetadataWriteUtils.java | 64 ++
 2 files changed, 84 insertions(+), 8 deletions(-)
 create mode 100644 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java



(hudi) branch asf-site updated: [DOCS] Updated inline and async process with more details (#10664)

2024-03-08 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 19734b58a62 [DOCS] Updated inline and async process with more details 
(#10664)
19734b58a62 is described below

commit 19734b58a6230930fea2ad1bb748d5d6f6bb24c7
Author: nadine farah 
AuthorDate: Fri Mar 8 10:16:59 2024 -0800

[DOCS] Updated inline and async process with more details (#10664)
---
 website/docs/clustering.md | 18 ++
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/website/docs/clustering.md b/website/docs/clustering.md
index f61c61a4476..149b690ff3b 100644
--- a/website/docs/clustering.md
+++ b/website/docs/clustering.md
@@ -170,9 +170,14 @@ for inline or async clustering are shown below with code 
samples.
 
 ## Inline clustering
 
-Inline clustering happens synchronously with the regular ingestion writer, 
which means the next round of ingestion
-cannot proceed until the clustering is complete. Inline clustering can be 
setup easily using spark dataframe options.
-See sample below
+Inline clustering happens synchronously with the regular ingestion writer or 
as part of the data ingestion pipeline. This means the next round of ingestion 
cannot proceed until the clustering is complete With inline clustering, Hudi 
will schedule, plan clustering operations after each commit is completed and 
execute the clustering plans after it’s created. This is the simplest 
deployment model to run because it’s easier to manage than running different 
asynchronous Spark jobs. This mode  [...]
+
+For this deployment mode, please enable and set: `hoodie.clustering.inline` 
+
+To choose how often clustering is triggered, also set: 
`hoodie.clustering.inline.max.commits`. 
+
+Inline clustering can be setup easily using spark dataframe options.
+See sample below:
 
 ```scala
 import org.apache.hudi.QuickstartUtils._
@@ -202,7 +207,12 @@ df.write.format("org.apache.hudi").
 
 ## Async Clustering
 
-Async clustering runs the clustering table service in the background without 
blocking the regular ingestions writers.
+Async clustering runs the clustering table service in the background without 
blocking the regular ingestions writers. There are three different ways to 
deploy an asynchronous clustering process: 
+
+- **Asynchronous execution within the same process**: In this deployment mode, 
Hudi will schedule and plan the clustering operations after each commit is 
completed as part of the ingestion pipeline. Separately, Hudi spins up another 
thread within the same job and executes the clustering table service. This is 
supported by Spark Streaming, Flink and DeltaStreamer in continuous mode. For 
this deployment mode, please enable `hoodie.clustering.async.enabled` and 
`hoodie.clustering.async.max. [...]
+- **Asynchronous scheduling and execution by a separate process**: In this 
deployment mode, the application will write data to a Hudi table as part of the 
ingestion pipeline. A separate clustering job will schedule, plan and execute 
the clustering operation. By running a different job for the clustering 
operation, it rebalances how Hudi uses compute resources: fewer compute 
resources are needed for the ingestion, which makes ingestion latency stable, 
and an independent set of compute res [...]
+- **Scheduling inline and executing async**: In this deployment mode, the 
application ingests data and schedules the clustering in one job; in another, 
the application executes the clustering plan. The supported writers (see below) 
won’t be blocked from ingesting data. If the metadata table is enabled, a lock 
provider is not needed. However, if the metadata table is enabled, please 
ensure all jobs have the lock providers configured for concurrency control. All 
writers support this deploy [...]
+
 Hudi supports 
[multi-writers](https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing)
 which provides
 snapshot isolation between multiple table services, thus allowing writers to 
continue with ingestion while clustering
 runs in the background.



(hudi) branch master updated: [HUDI-7411] Meta sync should consider cleaner commit (#10676)

2024-03-08 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit 
(#10676)
2af83e2d9a8 is described below

commit 2af83e2d9a8fbb6cc33fdf29e38b72684c2da4ca
Author: Sagar Sumit 
AuthorDate: Fri Mar 8 23:34:53 2024 +0530

[HUDI-7411] Meta sync should consider cleaner commit (#10676)
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 27 ++---
 .../hudi/common/table/TestTimelineUtils.java   | 46 --
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  7 +---
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  5 +--
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 5e710800d6f..dbe8f83fdbe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -81,13 +81,15 @@ public class TimelineUtils {
   }
 
   /**
-   * Returns partitions that have been deleted or marked for deletion in the 
given timeline.
+   * Returns partitions that have been deleted or marked for deletion in the 
timeline between given commit time range.
* Does not include internal operations such as clean in the timeline.
*/
-  public static List getDroppedPartitions(HoodieTimeline timeline) {
+  public static List getDroppedPartitions(HoodieTableMetaClient 
metaClient, Option lastCommitTimeSynced, Option 
lastCommitCompletionTimeSynced) {
+HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
+: metaClient.getActiveTimeline();
 HoodieTimeline completedTimeline = 
timeline.getWriteTimeline().filterCompletedInstants();
 HoodieTimeline replaceCommitTimeline = 
completedTimeline.getCompletedReplaceTimeline();
-
 Map partitionToLatestDeleteTimestamp = 
replaceCommitTimeline.getInstantsAsStream()
 .map(instant -> {
   try {
@@ -102,6 +104,21 @@ public class TimelineUtils {
 .flatMap(pair -> 
pair.getRight().getPartitionToReplaceFileIds().keySet().stream()
 .map(partition -> new AbstractMap.SimpleEntry<>(partition, 
pair.getLeft().getTimestamp()))
 ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(existing, replace) -> replace));
+// cleaner could delete a partition when there are no active filegroups in 
the partition
+HoodieTimeline cleanerTimeline = 
metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
+cleanerTimeline.getInstantsAsStream()
+.forEach(instant -> {
+  try {
+HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(cleanerTimeline.getInstantDetails(instant).get());
+cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
+  if (partitionMetadata.getIsPartitionDeleted()) {
+partitionToLatestDeleteTimestamp.put(partition, 
instant.getTimestamp());
+  }
+});
+  } catch (IOException e) {
+throw new HoodieIOException("Failed to get partitions cleaned at " 
+ instant, e);
+  }
+});
 
 if (partitionToLatestDeleteTimestamp.isEmpty()) {
   // There is no dropped partitions
@@ -244,7 +261,7 @@ public class TimelineUtils {
 
   return false;
 } catch (IOException e) {
-  throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePath(), e);
+  throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePathV2().toString(), e);
 }
   }
 
@@ -440,7 +457,7 @@ public class TimelineUtils {
   }
 
   public enum HollowCommitHandling {
-FAIL, BLOCK, USE_TRANSITION_TIME;
+FAIL, BLOCK, USE_TRANSITION_TIME
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index c81a05b4c20..d258753c3a8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -158,7 +158,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
 
   HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
   activeTimeline.createN

[jira] [Created] (HUDI-7491) Handle null extra metadata w/ clean commit metadata

2024-03-07 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7491:
-

 Summary: Handle null extra metadata w/ clean commit metadata
 Key: HUDI-7491
 URL: https://issues.apache.org/jira/browse/HUDI-7491
 Project: Apache Hudi
  Issue Type: Bug
  Components: cleaning
Reporter: sivabalan narayanan


[https://github.com/apache/hudi/pull/10651/] 

 

After this fix, older clean commits may not have any extra metadata. we need to 
handle null for the entire map. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed

2024-03-07 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7490:
--
Description: 
We added a fix recently where cleaner will take care of cleaning up savepointed 
files too w/o fail with 

[https://github.com/apache/hudi/pull/10651] 

Scenario above patch fixes:

By default incremental cleaner is enabled. Cleaner during planning, will only 
account for partitions touched in recent commits (after earliest commit to 
retain from last completed clean). 

So, if there is a savepoint added and removed later on, cleaner might miss to 
take care of cleaning. So, we fixed the gap in above patch. 

Fix: Clean commit metadata will track savepointed commits. So, next time when 
clean planner runs, we find the mis-match b/w tracked savepointed commits and 
current savepoints from timeline and if there is a difference, cleaner will 
account for partittions touched by the savepointed commit.  

 

 

But we might have a gap wrt archival. 

If we ensure archival will run just after cleaning and not independently, we 
should be good.

but if there is a chance we could expose duplicate data to readers w/ below 
scenario. 

 

lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
files created at t5 and went past it. and say we have a replace commit at t10 
which replaced all data files that were created at t5. 

w/ this state, say we removed the savepoint. 

we will have data files created by t5.commit in data directory. 

as long as t10 is in active timeline, readers will only see files written by 
t10 and will ignore files written by t5. 

at this juncture, if we run archival (w/o cleaner), archival might archive t5 
to t10. on which case both data files written by t5 and t10 will be exposed to 
readers. 

In most common deployment models, where we recommend to stop the pipeline while 
doing savepoint and restore or deleting savepoint, this might be uncommon. but 
there is a chance that this could happen. 

 

So, we have to guard the archival in this case. 

Essentially, we need to ensure before archiving a replace commit, the fileIds 
that were replaced are cleaned by the cleaner. 

 

Probable fix:

We can follow similar approach we followed in 
[https://github.com/apache/hudi/pull/10651]  . 

Essentially check for list of savepoints in current timeline and compare it w/ 
savepointed instants in latest clean commit metadata. If they match, we do not 
need to block archival. but if there is a difference (which means a savepoint 
was deleted in timeline and cleaner has not got a chance to cleanup yet), we 
should punt archiving anything and come back next time. 

 

 

 

 

  was:
We added a fix recently where cleaner will take care of cleaning up savepointed 
files too w/o fail with 

[https://github.com/apache/hudi/pull/10651] 

 

But we might have a gap wrt archival. 

If we ensure archival will run just after cleaning and not independently, we 
should be good.

but if there is a chance we could expose duplicate data to readers w/ below 
scenario. 

 

lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
files created at t5 and went past it. and say we have a replace commit at t10 
which replaced all data files that were created at t5. 

w/ this state, say we removed the savepoint. 

we will have data files created by t5.commit in data directory. 

as long as t10 is in active timeline, readers will only see files written by 
t10 and will ignore files written by t5. 

at this juncture, if we run archival (w/o cleaner), archival might archive t5 
to t10. on which case both data files written by t5 and t10 will be exposed to 
readers. 

In most common deployment models, where we recommend to stop the pipeline while 
doing savepoint and restore or deleting savepoint, this might be uncommon. but 
there is a chance that this could happen. 

 

So, we have to guard the archival in this case. 

Essentially, we need to ensure before archiving a replace commit, the fileIds 
that were replaced are cleaned by the cleaner. 

 

Probable fix:

We can follow similar approach we followed in 
[https://github.com/apache/hudi/pull/10651]  . 

Essentially check for list of savepoints in current timeline and compare it w/ 
savepointed instants in latest clean commit metadata. If they match, we do not 
need to block archival. but if there is a difference (which means a savepoint 
was deleted in timeline and cleaner has not got a chance to cleanup yet), we 
should punt archiving anything and come back next time. 

 

 

 

 


> Fix archival guarding data files not yet cleaned up by cleaner when savepoint 
> is removed
> 
>
> Key: HUDI-7490
> URL: https://issues.apache.org/jira/browse/HUDI-7490
>

[jira] [Updated] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed

2024-03-07 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-7490:
--
Description: 
We added a fix recently where cleaner will take care of cleaning up savepointed 
files too w/o fail with 

[https://github.com/apache/hudi/pull/10651] 

 

But we might have a gap wrt archival. 

If we ensure archival will run just after cleaning and not independently, we 
should be good.

but if there is a chance we could expose duplicate data to readers w/ below 
scenario. 

 

lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
files created at t5 and went past it. and say we have a replace commit at t10 
which replaced all data files that were created at t5. 

w/ this state, say we removed the savepoint. 

we will have data files created by t5.commit in data directory. 

as long as t10 is in active timeline, readers will only see files written by 
t10 and will ignore files written by t5. 

at this juncture, if we run archival (w/o cleaner), archival might archive t5 
to t10. on which case both data files written by t5 and t10 will be exposed to 
readers. 

In most common deployment models, where we recommend to stop the pipeline while 
doing savepoint and restore or deleting savepoint, this might be uncommon. but 
there is a chance that this could happen. 

 

So, we have to guard the archival in this case. 

Essentially, we need to ensure before archiving a replace commit, the fileIds 
that were replaced are cleaned by the cleaner. 

 

Probable fix:

We can follow similar approach we followed in 
[https://github.com/apache/hudi/pull/10651]  . 

Essentially check for list of savepoints in current timeline and compare it w/ 
savepointed instants in latest clean commit metadata. If they match, we do not 
need to block archival. but if there is a difference (which means a savepoint 
was deleted in timeline and cleaner has not got a chance to cleanup yet), we 
should punt archiving anything and come back next time. 

 

 

 

 

  was:
We added a fix recently where cleaner will take care of cleaning up savepointed 
files too w/o fail with 

[https://github.com/apache/hudi/pull/10651] 

 

But we might have a gap wrt archival. 

If we ensure archival will run just after cleaning and not independently, we 
should be good.

but if there is a chance we could expose duplicate data to readers w/ below 
scenario. 

 

lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
files created at t5 and went past it. and say we have a replace commit at t10 
which replaced all data files that were created at t5. 

w/ this state, say we removed the savepoint. 

we will have data files created by t5.commit in data directory. 

as long as t10 is in active timeline, readers will only see files written by 
t10 and will ignore files written by t5. 

at this juncture, if we run archival (w/o cleaner), archival might archive t5 
to t10. on which case both data files written by t5 and t10 will be exposed to 
readers. 

So, we have to guard the archival in this case. 

Essentially, we need to ensure before archiving a replace commit, the fileIds 
that were replaced are cleaned by the cleaner. 

 

Probable fix:

We can follow similar approach we followed in 
[https://github.com/apache/hudi/pull/10651]  . 

Essentially check for list of savepoints in current timeline and compare it w/ 
savepointed instants in latest clean commit metadata. If they match, we do not 
need to block archival. but if there is a difference (which means a savepoint 
was deleted in timeline and cleaner has not got a chance to cleanup yet), we 
should punt archiving anything and come back next time. 

 

 

 

 


> Fix archival guarding data files not yet cleaned up by cleaner when savepoint 
> is removed
> 
>
> Key: HUDI-7490
> URL: https://issues.apache.org/jira/browse/HUDI-7490
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: archiving, cleaning, clustering
>    Reporter: sivabalan narayanan
>Priority: Major
>
> We added a fix recently where cleaner will take care of cleaning up 
> savepointed files too w/o fail with 
> [https://github.com/apache/hudi/pull/10651] 
>  
> But we might have a gap wrt archival. 
> If we ensure archival will run just after cleaning and not independently, we 
> should be good.
> but if there is a chance we could expose duplicate data to readers w/ below 
> scenario. 
>  
> lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
> files created at t5 and went past it. and say we have a replace commit at t10 
> which replaced all data files that were created at t5. 
> w/ this state,

[jira] [Created] (HUDI-7490) Fix archival guarding data files not yet cleaned up by cleaner when savepoint is removed

2024-03-07 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7490:
-

 Summary: Fix archival guarding data files not yet cleaned up by 
cleaner when savepoint is removed
 Key: HUDI-7490
 URL: https://issues.apache.org/jira/browse/HUDI-7490
 Project: Apache Hudi
  Issue Type: Bug
  Components: archiving, cleaning, clustering
Reporter: sivabalan narayanan


We added a fix recently where cleaner will take care of cleaning up savepointed 
files too w/o fail with 

[https://github.com/apache/hudi/pull/10651] 

 

But we might have a gap wrt archival. 

If we ensure archival will run just after cleaning and not independently, we 
should be good.

but if there is a chance we could expose duplicate data to readers w/ below 
scenario. 

 

lets say we have a savepoint at t5.commit. So, cleaner skipped to delete the 
files created at t5 and went past it. and say we have a replace commit at t10 
which replaced all data files that were created at t5. 

w/ this state, say we removed the savepoint. 

we will have data files created by t5.commit in data directory. 

as long as t10 is in active timeline, readers will only see files written by 
t10 and will ignore files written by t5. 

at this juncture, if we run archival (w/o cleaner), archival might archive t5 
to t10. on which case both data files written by t5 and t10 will be exposed to 
readers. 

So, we have to guard the archival in this case. 

Essentially, we need to ensure before archiving a replace commit, the fileIds 
that were replaced are cleaned by the cleaner. 

 

Probable fix:

We can follow similar approach we followed in 
[https://github.com/apache/hudi/pull/10651]  . 

Essentially check for list of savepoints in current timeline and compare it w/ 
savepointed instants in latest clean commit metadata. If they match, we do not 
need to block archival. but if there is a difference (which means a savepoint 
was deleted in timeline and cleaner has not got a chance to cleanup yet), we 
should punt archiving anything and come back next time. 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565)

2024-03-05 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c7ee8ef210 [HUDI-7337] Implement MetricsReporter that reports metrics 
to M3 (#10565)
6c7ee8ef210 is described below

commit 6c7ee8ef210ae7b410d6e8039a770b5cf27c4067
Author: Krishen <22875197+kb...@users.noreply.github.com>
AuthorDate: Tue Mar 5 08:41:39 2024 -0800

[HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565)


-

Co-authored-by: Krishen Bhan <“bkris...@uber.com”>
---
 hudi-client/hudi-client-common/pom.xml |  10 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  28 
 .../hudi/config/metrics/HoodieMetricsM3Config.java | 126 ++
 .../hudi/metadata/HoodieMetadataWriteUtils.java|  10 ++
 .../hudi/metrics/MetricsReporterFactory.java   |   4 +
 .../apache/hudi/metrics/MetricsReporterType.java   |   2 +-
 .../apache/hudi/metrics/m3/M3MetricsReporter.java  | 120 +
 .../hudi/metrics/m3/M3ScopeReporterAdaptor.java| 145 +
 .../org/apache/hudi/metrics/m3/TestM3Metrics.java  |  92 +
 packaging/hudi-flink-bundle/pom.xml|   6 +
 packaging/hudi-integ-test-bundle/pom.xml   |   6 +
 packaging/hudi-kafka-connect-bundle/pom.xml|   6 +
 packaging/hudi-spark-bundle/pom.xml|   7 +
 packaging/hudi-utilities-bundle/pom.xml|   6 +
 packaging/hudi-utilities-slim-bundle/pom.xml   |   6 +
 pom.xml|  12 +-
 16 files changed, 584 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-client-common/pom.xml 
b/hudi-client/hudi-client-common/pom.xml
index 1778e2e98b4..9a4a7f2104b 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -120,6 +120,16 @@
   io.prometheus
   simpleclient_pushgateway
 
+
+  com.uber.m3
+  tally-m3
+  ${tally.version}
+
+
+  com.uber.m3
+  tally-core
+  ${tally.version}
+
 
 
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9447069a995..93691e8cdae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -68,6 +68,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsM3Config;
 import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
@@ -2238,6 +2239,26 @@ public class HoodieWriteConfig extends HoodieConfig {
 return 
getInt(HoodieMetricsGraphiteConfig.GRAPHITE_REPORT_PERIOD_IN_SECONDS);
   }
 
+  public String getM3ServerHost() {
+return getString(HoodieMetricsM3Config.M3_SERVER_HOST_NAME);
+  }
+
+  public int getM3ServerPort() {
+return getInt(HoodieMetricsM3Config.M3_SERVER_PORT_NUM);
+  }
+
+  public String getM3Tags() {
+return getString(HoodieMetricsM3Config.M3_TAGS);
+  }
+
+  public String getM3Env() {
+return getString(HoodieMetricsM3Config.M3_ENV);
+  }
+
+  public String getM3Service() {
+return getString(HoodieMetricsM3Config.M3_SERVICE);
+  }
+
   public String getJmxHost() {
 return getString(HoodieMetricsJmxConfig.JMX_HOST_NAME);
   }
@@ -2745,6 +2766,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 private boolean isPreCommitValidationConfigSet = false;
 private boolean isMetricsJmxConfigSet = false;
 private boolean isMetricsGraphiteConfigSet = false;
+private boolean isMetricsM3ConfigSet = false;
 private boolean isLayoutConfigSet = false;
 
 public Builder withEngineType(EngineType engineType) {
@@ -2984,6 +3006,12 @@ public class HoodieWriteConfig extends HoodieConfig {
   return this;
 }
 
+public Builder withMetricsM3Config(HoodieMetricsM3Config metricsM3Config) {
+  writeConfig.getProps().putAll(metricsM3Config.getProps());
+  isMetricsM3ConfigSet = true;
+  return this;
+}
+
 public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig 
validatorConfig) {
   writeConfig.getProps().putAll(validatorConfig.getProps());
   isPreCommitValidationConfigSet = true;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/co

[jira] [Created] (HUDI-7478) Fix max delta commits guard check w/ MDT

2024-03-04 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7478:
-

 Summary: Fix max delta commits guard check w/ MDT 
 Key: HUDI-7478
 URL: https://issues.apache.org/jira/browse/HUDI-7478
 Project: Apache Hudi
  Issue Type: Bug
  Components: metadata
Reporter: sivabalan narayanan


protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, 
int maxNumDeltaCommitsWhenPending) \{
final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
Option lastCompaction = 
activeTimeline.filterCompletedInstants()
.filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
int numDeltaCommits = lastCompaction.isPresent()
? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
: activeTimeline.getDeltaCommitTimeline().countInstants();
if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
  throw new HoodieMetadataException(String.format("Metadata table's 
deltacommits exceeded %d: "
  + "this is likely caused by a pending instant in the data table. 
Resolve the pending instant "
  + "or adjust `%s`, then restart the pipeline.",
  maxNumDeltaCommitsWhenPending, 
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
}
  } 






Here we account for action type "compaction. But compaction completed instant 
will have "commit" as action. So, we need to fix it. 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7460) Fix compaction schedule with pending delta commits

2024-02-29 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7460:
-

 Summary: Fix compaction schedule with pending delta commits
 Key: HUDI-7460
 URL: https://issues.apache.org/jira/browse/HUDI-7460
 Project: Apache Hudi
  Issue Type: Improvement
  Components: compaction
Reporter: sivabalan narayanan


Hudi has a constraint that compaction schedule can happen only if there are no 
pending delta commits whose instant time < compaction instant being scheduled. 
We were throwing exception when this condition is not met. wee should fix the 
user behavior here so that we do not throw exception and return an empty plan 
when this condition is met.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated (89d267a4bf4 -> 6f7fe8a05a9)

2024-02-27 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


from 89d267a4bf4 [HUDI-5823] Partition ttl management (#9723)
 add 6f7fe8a05a9 [MINOR] Modify filter to allow removal of column stats 
from metadata table for bootstrap table files (#10238)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java| 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)



[jira] [Assigned] (HUDI-7429) Fix avg record size estimation for delta commits and replace commits

2024-02-20 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7429:
-

Assignee: sivabalan narayanan

> Fix avg record size estimation for delta commits and replace commits
> 
>
> Key: HUDI-7429
> URL: https://issues.apache.org/jira/browse/HUDI-7429
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: writer-core
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>
> avg record size calculation only considers COMMIT for now. lets fix it to 
> include delta commit and replace commits as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7429) Fix avg record size estimation for delta commits and replace commits

2024-02-20 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7429:
-

 Summary: Fix avg record size estimation for delta commits and 
replace commits
 Key: HUDI-7429
 URL: https://issues.apache.org/jira/browse/HUDI-7429
 Project: Apache Hudi
  Issue Type: Improvement
  Components: writer-core
Reporter: sivabalan narayanan


avg record size calculation only considers COMMIT for now. lets fix it to 
include delta commit and replace commits as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


(hudi) branch master updated: [MINOR] Clarify config descriptions (#10681)

2024-02-15 Thread sivabalan
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 9da1f2b15e2 [MINOR] Clarify config descriptions (#10681)
9da1f2b15e2 is described below

commit 9da1f2b15e2bf873a7d3db56dbc0183479c38c4c
Author: Bhavani Sudha Saktheeswaran <2179254+bhasu...@users.noreply.github.com>
AuthorDate: Thu Feb 15 20:39:30 2024 -0800

[MINOR] Clarify config descriptions (#10681)

This aligns with the doc change here: 
https://github.com/apache/hudi/pull/10680
---
 .../src/main/scala/org/apache/hudi/DataSourceOptions.scala  | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 99080629e17..47a7c61a60f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -500,7 +500,9 @@ object DataSourceWriteOptions {
 .defaultValue("false")
 .markAdvanced()
 .withDocumentation("If set to true, records from the incoming dataframe 
will not overwrite existing records with the same key during the write 
operation. " +
-  "This config is deprecated as of 0.14.0. Please use 
hoodie.datasource.insert.dup.policy instead.");
+  " **Note** Just for Insert operation in Spark SQL writing since 
0.14.0, users can switch to the config `hoodie.datasource.insert.dup.policy` 
instead " +
+  "for a simplified duplicate handling experience. The new config will be 
incorporated into all other writing flows and this config will be fully 
deprecated " +
+  "in future releases.");
 
   val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
 .key("hoodie.datasource.write.partitions.to.delete")
@@ -597,7 +599,7 @@ object DataSourceWriteOptions {
 .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, 
FAIL_INSERT_DUP_POLICY)
 .markAdvanced()
 .sinceVersion("0.14.0")
-.withDocumentation("When operation type is set to \"insert\", users can 
optionally enforce a dedup policy. This policy will be employed "
+.withDocumentation("**Note** This is only applicable to Spark SQL 
writing.When operation type is set to \"insert\", users can optionally 
enforce a dedup policy. This policy will be employed "
   + " when records being ingested already exists in storage. Default 
policy is none and no action will be taken. Another option is to choose " +
   " \"drop\", on which matching records from incoming will be dropped and 
the rest will be ingested. Third option is \"fail\" which will " +
   "fail the write operation when same records are re-ingested. In other 
words, a given record as deduced by the key generation policy " +



[jira] [Assigned] (HUDI-7407) Add optional clean support to standalone compaction and clustering jobs

2024-02-13 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan reassigned HUDI-7407:
-

Assignee: sivabalan narayanan

> Add optional clean support to standalone compaction and clustering jobs
> ---
>
> Key: HUDI-7407
> URL: https://issues.apache.org/jira/browse/HUDI-7407
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: table-service
>Reporter: sivabalan narayanan
>    Assignee: sivabalan narayanan
>Priority: Major
>  Labels: pull-request-available
>
> Lets add top level config to standalone compaction and clustering job to 
> optionally clean. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7407) Add optional clean support to standalone compaction and clustering jobs

2024-02-13 Thread sivabalan narayanan (Jira)
sivabalan narayanan created HUDI-7407:
-

 Summary: Add optional clean support to standalone compaction and 
clustering jobs
 Key: HUDI-7407
 URL: https://issues.apache.org/jira/browse/HUDI-7407
 Project: Apache Hudi
  Issue Type: Improvement
  Components: table-service
Reporter: sivabalan narayanan


Lets add top level config to standalone compaction and clustering job to 
optionally clean. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >