This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 130498708bb add job context (#10848) 130498708bb is described below commit 130498708bb1cd5da1d0e725971b3d721eeef231 Author: Tim Brown <t...@onehouse.ai> AuthorDate: Mon Mar 11 20:42:02 2024 -0500 add job context (#10848) --- .../org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java | 1 + .../src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 15f7b8a6b2c..36f75b6a5b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -263,6 +263,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R> Iterator<HoodieRecord<T>> recordItr) throws IOException; protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) { + context.setJobStatus(this.getClass().getSimpleName(), "Clustering records for " + config.getTableName()); HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); // Mark instant as clustering inflight table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index ebb7729c1ee..2d8e0f02c31 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -513,6 +513,7 @@ public class StreamSync implements Serializable, Closeable { private InputBatch fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String instantTime, HoodieTableMetaClient metaClient) { + hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching next batch: " + cfg.targetTableName); HoodieRecordType recordType = createRecordMerger(props).getRecordType(); if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ && !cfg.operation.equals(WriteOperationType.BULK_INSERT) @@ -535,7 +536,7 @@ public class StreamSync implements Serializable, Closeable { } // handle empty batch with change in checkpoint - hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty"); + hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty: " + cfg.targetTableName); if (useRowWriter) { // no additional processing required for row writer.