This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 130498708bb add job context (#10848)
130498708bb is described below

commit 130498708bb1cd5da1d0e725971b3d721eeef231
Author: Tim Brown <t...@onehouse.ai>
AuthorDate: Mon Mar 11 20:42:02 2024 -0500

    add job context (#10848)
---
 .../org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java  | 1 +
 .../src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java   | 3 ++-
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 15f7b8a6b2c..36f75b6a5b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -263,6 +263,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
                                                               
Iterator<HoodieRecord<T>> recordItr) throws IOException;
 
   protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
executeClustering(HoodieClusteringPlan clusteringPlan) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Clustering records 
for " + config.getTableName());
     HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
     // Mark instant as clustering inflight
     table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index ebb7729c1ee..2d8e0f02c31 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -513,6 +513,7 @@ public class StreamSync implements Serializable, Closeable {
 
   private InputBatch fetchFromSourceAndPrepareRecords(Option<String> 
resumeCheckpointStr, String instantTime,
         HoodieTableMetaClient metaClient) {
+    hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching 
next batch: " + cfg.targetTableName);
     HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && 
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
         && !cfg.operation.equals(WriteOperationType.BULK_INSERT)
@@ -535,7 +536,7 @@ public class StreamSync implements Serializable, Closeable {
     }
 
     // handle empty batch with change in checkpoint
-    hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty");
+    hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty: " + cfg.targetTableName);
 
 
     if (useRowWriter) { // no additional processing required for row writer.

Reply via email to