This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c3c5d  [HUDI-679] Make io package Spark free (#1460)
07c3c5d is described below

commit 07c3c5d797f612f9b73b2805b65275c2029c2442
Author: leesf <490081...@qq.com>
AuthorDate: Sun Mar 29 16:54:00 2020 +0800

    [HUDI-679] Make io package Spark free (#1460)
    
    * [HUDI-679] Make io package Spark free
---
 .../scala/org/apache/hudi/cli/SparkHelpers.scala   |  3 +-
 .../hudi/client/SparkTaskContextSupplier.java      | 42 ++++++++++++++++++++++
 .../hudi/execution/BulkInsertMapFunction.java      |  2 +-
 .../execution/CopyOnWriteLazyInsertIterable.java   | 10 ++++--
 .../execution/MergeOnReadLazyInsertIterable.java   |  9 ++---
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 14 ++++----
 .../org/apache/hudi/io/HoodieCreateHandle.java     | 14 ++++----
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 14 ++++----
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 25 +++++++++----
 .../hudi/io/storage/HoodieParquetWriter.java       | 10 +++---
 .../io/storage/HoodieStorageWriterFactory.java     | 13 +++----
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  8 ++---
 .../apache/hudi/table/HoodieMergeOnReadTable.java  |  4 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  7 ++++
 .../hudi/client/TestUpdateSchemaEvolution.java     |  4 +--
 .../hudi/common/HoodieClientTestHarness.java       |  3 ++
 .../apache/hudi/common/HoodieClientTestUtils.java  |  4 ++-
 .../io/storage/TestHoodieStorageWriterFactory.java |  6 ++--
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  2 +-
 19 files changed, 136 insertions(+), 58 deletions(-)

diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala 
b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
index 4c8e4c1..6fdac1c 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.avro.HoodieAvroWriteSupport
+import org.apache.hudi.client.SparkTaskContextSupplier
 import org.apache.hudi.common.HoodieJsonPayload
 import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
 import org.apache.hudi.common.model.HoodieRecord
@@ -45,7 +46,7 @@ object SparkHelpers {
       
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, 
HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
     val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new 
AvroSchemaConverter().convert(schema), schema, filter)
     val parquetConfig: HoodieParquetConfig = new 
HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, 
HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, 
HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, 
HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, 
HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
-    val writer = new HoodieParquetWriter[HoodieJsonPayload, 
IndexedRecord](instantTime, destinationFile, parquetConfig, schema)
+    val writer = new HoodieParquetWriter[HoodieJsonPayload, 
IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new 
SparkTaskContextSupplier())
     for (rec <- sourceRecords) {
       val key: String = 
rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
       if (!keysToSkip.contains(key)) {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
 
b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
new file mode 100644
index 0000000..601dd98
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.TaskContext;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Spark task context supplier.
+ */
+public class SparkTaskContextSupplier implements Serializable {
+
+  public Supplier<Integer> getPartitionIdSupplier() {
+    return () -> TaskContext.getPartitionId();
+  }
+
+  public Supplier<Integer> getStageIdSupplier() {
+    return () -> TaskContext.get().stageId();
+  }
+
+  public Supplier<Long> getAttemptIdSupplier() {
+    return () -> TaskContext.get().taskAttemptId();
+  }
+}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
 
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
index 249ff3d..5d4391c 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
@@ -51,6 +51,6 @@ public class BulkInsertMapFunction<T extends 
HoodieRecordPayload>
   @Override
   public Iterator<List<WriteStatus>> call(Integer partition, 
Iterator<HoodieRecord<T>> sortedRecordItr) {
     return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, 
instantTime, hoodieTable,
-        fileIDPrefixes.get(partition));
+        fileIDPrefixes.get(partition), 
hoodieTable.getSparkTaskContextSupplier());
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
 
b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
index bdcea61..8f98496 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.execution;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable<T extends 
HoodieRecordPayload>
   protected final HoodieTable<T> hoodieTable;
   protected final String idPrefix;
   protected int numFilesWritten;
+  protected SparkTaskContextSupplier sparkTaskContextSupplier;
 
   public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> 
sortedRecordItr, HoodieWriteConfig config,
-                                       String instantTime, HoodieTable<T> 
hoodieTable, String idPrefix) {
+                                       String instantTime, HoodieTable<T> 
hoodieTable, String idPrefix,
+                                       SparkTaskContextSupplier 
sparkTaskContextSupplier) {
     super(sortedRecordItr);
     this.hoodieConfig = config;
     this.instantTime = instantTime;
     this.hoodieTable = hoodieTable;
     this.idPrefix = idPrefix;
     this.numFilesWritten = 0;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
   }
 
   // Used for caching HoodieRecord along with insertValue. We need this to 
offload computation work to buffering thread.
@@ -137,7 +141,7 @@ public class CopyOnWriteLazyInsertIterable<T extends 
HoodieRecordPayload>
       // lazily initialize the handle, for the first time
       if (handle == null) {
         handle = new HoodieCreateHandle(hoodieConfig, instantTime, 
hoodieTable, insertPayload.getPartitionPath(),
-            getNextFileId(idPrefix));
+            getNextFileId(idPrefix), sparkTaskContextSupplier);
       }
 
       if (handle.canWrite(payload.record)) {
@@ -148,7 +152,7 @@ public class CopyOnWriteLazyInsertIterable<T extends 
HoodieRecordPayload>
         statuses.add(handle.close());
         // Need to handle the rejected payload & open new handle
         handle = new HoodieCreateHandle(hoodieConfig, instantTime, 
hoodieTable, insertPayload.getPartitionPath(),
-            getNextFileId(idPrefix));
+            getNextFileId(idPrefix), sparkTaskContextSupplier);
         handle.write(insertPayload, payload.insertValue, payload.exception); 
// we should be able to write 1 payload.
       }
     }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
 
b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
index 11c0035..02a9ead 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.execution;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -35,8 +36,8 @@ import java.util.List;
 public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> 
extends CopyOnWriteLazyInsertIterable<T> {
 
   public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> 
sortedRecordItr, HoodieWriteConfig config,
-      String instantTime, HoodieTable<T> hoodieTable, String idPfx) {
-    super(sortedRecordItr, config, instantTime, hoodieTable, idPfx);
+      String instantTime, HoodieTable<T> hoodieTable, String idPfx, 
SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, 
sparkTaskContextSupplier);
   }
 
   @Override
@@ -53,7 +54,7 @@ public class MergeOnReadLazyInsertIterable<T extends 
HoodieRecordPayload> extend
       // lazily initialize the handle, for the first time
       if (handle == null) {
         handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
-                insertPayload.getPartitionPath(), getNextFileId(idPrefix));
+                insertPayload.getPartitionPath(), getNextFileId(idPrefix), 
sparkTaskContextSupplier);
       }
       if (handle.canWrite(insertPayload)) {
         // write the payload, if the handle has capacity
@@ -64,7 +65,7 @@ public class MergeOnReadLazyInsertIterable<T extends 
HoodieRecordPayload> extend
         statuses.add(handle.getWriteStatus());
         // Need to handle the rejected payload & open new handle
         handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
-                insertPayload.getPartitionPath(), getNextFileId(idPrefix));
+                insertPayload.getPartitionPath(), getNextFileId(idPrefix), 
sparkTaskContextSupplier);
         handle.write(insertPayload, payload.insertValue, payload.exception); 
// we should be able to write 1 payload.
       }
     }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f1bd57c..0c08734 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
@@ -49,7 +50,6 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
 import org.apache.spark.util.SizeEstimator;
 
 import java.io.IOException;
@@ -101,16 +101,16 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
   private long insertRecordsWritten = 0;
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-                            String partitionPath, String fileId, 
Iterator<HoodieRecord<T>> recordItr) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable);
+                            String partitionPath, String fileId, 
Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier 
sparkTaskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
     writeStatus.setStat(new HoodieDeltaWriteStat());
     this.fileId = fileId;
     this.recordItr = recordItr;
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-                            String partitionPath, String fileId) {
-    this(config, instantTime, hoodieTable, partitionPath, fileId, null);
+                            String partitionPath, String fileId, 
SparkTaskContextSupplier sparkTaskContextSupplier) {
+    this(config, instantTime, hoodieTable, partitionPath, fileId, null, 
sparkTaskContextSupplier);
   }
 
   private void init(HoodieRecord record) {
@@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
         //save hoodie partition meta in the partition path
         HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, baseInstantTime,
             new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
-        partitionMetadata.trySave(TaskContext.getPartitionId());
+        partitionMetadata.trySave(getPartitionId());
         this.writer = createLogWriter(fileSlice, baseInstantTime);
         this.currentLogFile = writer.getLogFile();
         ((HoodieDeltaWriteStat) 
writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
@@ -163,7 +163,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
         // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
         avroRecord = Option.of(rewriteRecord((GenericRecord) 
avroRecord.get()));
         String seqId =
-            HoodieRecord.generateSequenceId(instantTime, 
TaskContext.getPartitionId(), recordIndex.getAndIncrement());
+            HoodieRecord.generateSequenceId(instantTime, getPartitionId(), 
recordIndex.getAndIncrement());
         HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), 
hoodieRecord.getRecordKey(),
             hoodieRecord.getPartitionPath(), fileId);
         HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) 
avroRecord.get(), instantTime, seqId);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 1ab22e0..dd8bdac 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -38,7 +39,6 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -56,8 +56,8 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
   private boolean useWriterSchema = false;
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-      String partitionPath, String fileId) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable);
+      String partitionPath, String fileId, SparkTaskContextSupplier 
sparkTaskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
     writeStatus.setFileId(fileId);
     writeStatus.setPartitionPath(partitionPath);
 
@@ -66,10 +66,10 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
     try {
       HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
           new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
-      partitionMetadata.trySave(TaskContext.getPartitionId());
+      partitionMetadata.trySave(getPartitionId());
       createMarkerFile(partitionPath);
       this.storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, path, 
hoodieTable, config, writerSchema);
+          HoodieStorageWriterFactory.getStorageWriter(instantTime, path, 
hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize 
HoodieStorageWriter for path " + path, e);
     }
@@ -80,8 +80,8 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
    * Called by the compactor code path.
    */
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-      String partitionPath, String fileId, Iterator<HoodieRecord<T>> 
recordIterator) {
-    this(config, instantTime, hoodieTable, partitionPath, fileId);
+      String partitionPath, String fileId, Iterator<HoodieRecord<T>> 
recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    this(config, instantTime, hoodieTable, partitionPath, fileId, 
sparkTaskContextSupplier);
     this.recordIterator = recordIterator;
     this.useWriterSchema = true;
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3a81340..5b95cd0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.SparkConfigUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,6 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -71,8 +71,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> 
extends HoodieWrit
   private boolean useWriterSchema;
 
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-      Iterator<HoodieRecord<T>> recordItr, String partitionPath, String 
fileId) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable);
+       Iterator<HoodieRecord<T>> recordItr, String partitionPath, String 
fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
     init(fileId, recordItr);
     init(fileId, partitionPath, 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get());
   }
@@ -82,8 +82,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> 
extends HoodieWrit
    */
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
       Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, 
String fileId,
-      HoodieBaseFile dataFileToBeMerged) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable);
+      HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier 
sparkTaskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
     this.useWriterSchema = true;
     init(fileId, this.partitionPath, dataFileToBeMerged);
@@ -111,7 +111,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieWrit
 
       HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
           new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
-      partitionMetadata.trySave(TaskContext.getPartitionId());
+      partitionMetadata.trySave(getPartitionId());
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" 
+ latestValidFilePath);
       String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
@@ -132,7 +132,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieWrit
 
       // Create the writer for writing the new version file
       storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, 
newFilePath, hoodieTable, config, writerSchema);
+          HoodieStorageWriterFactory.getStorageWriter(instantTime, 
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
     } catch (IOException io) {
       LOG.error("Error in update task at commit " + instantTime, io);
       writeStatus.setGlobalError(io);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 336e508..dd67a6a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -38,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
 
 import java.io.IOException;
 
@@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
   protected final String partitionPath;
   protected final String fileId;
   protected final String writeToken;
+  protected final SparkTaskContextSupplier sparkTaskContextSupplier;
 
   public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
-                           String fileId, HoodieTable<T> hoodieTable) {
+                           String fileId, HoodieTable<T> hoodieTable, 
SparkTaskContextSupplier sparkTaskContextSupplier) {
     super(config, instantTime, hoodieTable);
     this.partitionPath = partitionPath;
     this.fileId = fileId;
-    this.writeToken = makeSparkWriteToken();
     this.originalSchema = new Schema.Parser().parse(config.getSchema());
     this.writerSchema = createHoodieWriteSchema(originalSchema);
     this.timer = new HoodieTimer().startTimer();
     this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
         !hoodieTable.getIndex().isImplicitWithStorage(), 
config.getWriteStatusFailureFraction());
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+    this.writeToken = makeWriteToken();
   }
 
   /**
    * Generate a write token based on the currently running spark task and its 
place in the spark dag.
    */
-  private static String makeSparkWriteToken() {
-    return FSUtils.makeWriteToken(TaskContext.getPartitionId(), 
TaskContext.get().stageId(),
-        TaskContext.get().taskAttemptId());
+  private String makeWriteToken() {
+    return FSUtils.makeWriteToken(getPartitionId(), getStageId(), 
getAttemptId());
   }
 
   public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -171,4 +172,16 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
   protected FileSystem getFileSystem() {
     return hoodieTable.getMetaClient().getFs();
   }
+
+  protected int getPartitionId() {
+    return sparkTaskContextSupplier.getPartitionIdSupplier().get();
+  }
+
+  protected int getStageId() {
+    return sparkTaskContextSupplier.getStageIdSupplier().get();
+  }
+
+  protected long getAttemptId() {
+    return sparkTaskContextSupplier.getAttemptIdSupplier().get();
+  }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java 
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index ad8987a..473a806 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.spark.TaskContext;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -52,9 +52,10 @@ public class HoodieParquetWriter<T extends 
HoodieRecordPayload, R extends Indexe
   private final HoodieAvroWriteSupport writeSupport;
   private final String instantTime;
   private final Schema schema;
+  private final SparkTaskContextSupplier sparkTaskContextSupplier;
 
-  public HoodieParquetWriter(String instantTime, Path file, 
HoodieParquetConfig parquetConfig, Schema schema)
-      throws IOException {
+  public HoodieParquetWriter(String instantTime, Path file, 
HoodieParquetConfig parquetConfig,
+      Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws 
IOException {
     super(HoodieWrapperFileSystem.convertToHoodiePath(file, 
parquetConfig.getHadoopConf()),
         ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), 
parquetConfig.getCompressionCodecName(),
         parquetConfig.getBlockSize(), parquetConfig.getPageSize(), 
parquetConfig.getPageSize(),
@@ -72,6 +73,7 @@ public class HoodieParquetWriter<T extends 
HoodieRecordPayload, R extends Indexe
     this.writeSupport = parquetConfig.getWriteSupport();
     this.instantTime = instantTime;
     this.schema = schema;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
   }
 
   public static Configuration registerFileSystem(Path file, Configuration 
conf) {
@@ -85,7 +87,7 @@ public class HoodieParquetWriter<T extends 
HoodieRecordPayload, R extends Indexe
   @Override
   public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
     String seqId =
-        HoodieRecord.generateSequenceId(instantTime, 
TaskContext.getPartitionId(), recordIndex.getAndIncrement());
+        HoodieRecord.generateSequenceId(instantTime, 
sparkTaskContextSupplier.getPartitionIdSupplier().get(), 
recordIndex.getAndIncrement());
     HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
record.getRecordKey(), record.getPartitionPath(),
         file.getName());
     HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
 
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
index 538c2ca..09f8ba9 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.bloom.filter.BloomFilter;
 import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,19 +40,19 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 public class HoodieStorageWriterFactory {
 
   public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieStorageWriter<R> getStorageWriter(
-      String instantTime, Path path, HoodieTable<T> hoodieTable, 
HoodieWriteConfig config, Schema schema)
-      throws IOException {
+      String instantTime, Path path, HoodieTable<T> hoodieTable, 
HoodieWriteConfig config, Schema schema,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
     final String name = path.getName();
     final String extension = FSUtils.isLogFile(path) ? 
HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetStorageWriter(instantTime, path, config, schema, 
hoodieTable);
+      return newParquetStorageWriter(instantTime, path, config, schema, 
hoodieTable, sparkTaskContextSupplier);
     }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
   private static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieStorageWriter<R> newParquetStorageWriter(
-      String instantTime, Path path, HoodieWriteConfig config, Schema schema, 
HoodieTable hoodieTable)
-      throws IOException {
+      String instantTime, Path path, HoodieWriteConfig config, Schema schema, 
HoodieTable hoodieTable,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
     BloomFilter filter = BloomFilterFactory
         .createBloomFilter(config.getBloomFilterNumEntries(), 
config.getBloomFilterFPP(),
             config.getDynamicBloomFilterMaxNumEntries(),
@@ -63,6 +64,6 @@ public class HoodieStorageWriterFactory {
         config.getParquetBlockSize(), config.getParquetPageSize(), 
config.getParquetMaxFileSize(),
         hoodieTable.getHadoopConf(), config.getParquetCompressionRatio());
 
-    return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema);
+    return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, 
sparkTaskContextSupplier);
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 0b16efe..250bc51 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -222,13 +222,13 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
   }
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    return new HoodieMergeHandle<>(config, instantTime, this, recordItr, 
partitionPath, fileId);
+    return new HoodieMergeHandle<>(config, instantTime, this, recordItr, 
partitionPath, fileId, sparkTaskContextSupplier);
   }
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String 
partitionPath, String fileId,
       Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile 
dataFileToBeMerged) {
     return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
-            partitionPath, fileId, dataFileToBeMerged);
+            partitionPath, fileId, dataFileToBeMerged, 
sparkTaskContextSupplier);
   }
 
   public Iterator<List<WriteStatus>> handleInsert(String instantTime, String 
idPfx, Iterator<HoodieRecord<T>> recordItr)
@@ -238,13 +238,13 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       LOG.info("Empty partition");
       return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
     }
-    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, 
this, idPfx);
+    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, 
this, idPfx, sparkTaskContextSupplier);
   }
 
   public Iterator<List<WriteStatus>> handleInsert(String instantTime, String 
partitionPath, String fileId,
       Iterator<HoodieRecord<T>> recordItr) {
     HoodieCreateHandle createHandle =
-        new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordItr);
+        new HoodieCreateHandle(config, instantTime, this, partitionPath, 
fileId, recordItr, sparkTaskContextSupplier);
     createHandle.write();
     return 
Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index a7c5a68..4690382 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -108,7 +108,7 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
       return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
     } else {
       HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, 
instantTime, this,
-              partitionPath, fileId, recordItr);
+              partitionPath, fileId, recordItr, sparkTaskContextSupplier);
       appendHandle.doAppend();
       appendHandle.close();
       return 
Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
@@ -120,7 +120,7 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
       throws Exception {
     // If canIndexLogFiles, write inserts to log files else write inserts to 
parquet files
     if (index.canIndexLogFiles()) {
-      return new MergeOnReadLazyInsertIterable<>(recordItr, config, 
instantTime, this, idPfx);
+      return new MergeOnReadLazyInsertIterable<>(recordItr, config, 
instantTime, this, idPfx, sparkTaskContextSupplier);
     } else {
       return super.handleInsert(instantTime, idPfx, recordItr);
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index e38510f..a05e28f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -84,6 +85,8 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
   private SerializableConfiguration hadoopConfiguration;
   private transient FileSystemViewManager viewManager;
 
+  protected final SparkTaskContextSupplier sparkTaskContextSupplier = new 
SparkTaskContextSupplier();
+
   protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
     this.config = config;
     this.hadoopConfiguration = new 
SerializableConfiguration(jsc.hadoopConfiguration());
@@ -448,4 +451,8 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
   private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
     return new FailSafeConsistencyGuard(fileSystem, 
config.getConsistencyGuardConfig());
   }
+
+  public SparkTaskContextSupplier getSparkTaskContextSupplier() {
+    return sparkTaskContextSupplier;
+  }
 }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 8c949fb..19919c7 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -89,7 +89,7 @@ public class TestUpdateSchemaEvolution extends 
HoodieClientTestHarness {
           .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), 
rowChange3.getPartitionPath()), rowChange3));
 
       HoodieCreateHandle createHandle =
-          new HoodieCreateHandle(config, "100", table, 
rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator());
+          new HoodieCreateHandle(config, "100", table, 
rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier);
       createHandle.write();
       return createHandle.close();
     }).collect();
@@ -119,7 +119,7 @@ public class TestUpdateSchemaEvolution extends 
HoodieClientTestHarness {
 
       try {
         HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", 
table2,
-                updateRecords.iterator(), record1.getPartitionPath(), fileId);
+                updateRecords.iterator(), record1.getPartitionPath(), fileId, 
supplier);
         Configuration conf = new Configuration();
         AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
         List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..c5c7d82 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.common;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
@@ -55,6 +56,8 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
   protected transient HoodieTableMetaClient metaClient;
   private static AtomicInteger instantGen = new AtomicInteger(1);
 
+  protected final SparkTaskContextSupplier supplier = new 
SparkTaskContextSupplier();
+
   public String getNextInstant() {
     return String.format("%09d", instantGen.getAndIncrement());
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
index b85de2c..b4601c2 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common;
 
 import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.filter.BloomFilter;
@@ -230,7 +231,8 @@ public class HoodieClientTestUtils {
         ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 
* 1024 * 1024,
         HoodieTestUtils.getDefaultHadoopConf(), 
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
     HoodieParquetWriter writer =
-        new HoodieParquetWriter(instantTime, new Path(basePath + "/" + 
partitionPath + "/" + filename), config, schema);
+        new HoodieParquetWriter(instantTime, new Path(basePath + "/" + 
partitionPath + "/" + filename), config,
+                schema, new SparkTaskContextSupplier());
     int seqId = 1;
     for (HoodieRecord record : records) {
       GenericRecord avroRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
 
b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
index 6758377..a1492dd 100755
--- 
a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.TestHoodieClientBase;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,15 +45,16 @@ public class TestHoodieStorageWriterFactory extends 
TestHoodieClientBase {
     final Path parquetPath = new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet");
     final HoodieWriteConfig cfg = getConfig();
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+    SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
     HoodieStorageWriter<IndexedRecord> parquetWriter = 
HoodieStorageWriterFactory.getStorageWriter(instantTime,
-        parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
+        parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, 
supplier);
     Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter);
 
     // other file format exception.
     final Path logPath = new Path(basePath + 
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     try {
       HoodieStorageWriter<IndexedRecord> logWriter = 
HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
-          table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
+          table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
       fail("should fail since log storage writer is not supported yet.");
     } catch (Exception e) {
       Assert.assertTrue(e instanceof UnsupportedOperationException);
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index c88233d..9ff0f97 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -103,7 +103,7 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
       when(record.getPartitionPath()).thenReturn(partitionPath);
       String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), 
TaskContext.get().stageId(),
           TaskContext.get().taskAttemptId());
-      HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, 
table, partitionPath, fileName);
+      HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, 
table, partitionPath, fileName, supplier);
       return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
     }).collect().get(0);
 

Reply via email to