This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 07c3c5d [HUDI-679] Make io package Spark free (#1460) 07c3c5d is described below commit 07c3c5d797f612f9b73b2805b65275c2029c2442 Author: leesf <490081...@qq.com> AuthorDate: Sun Mar 29 16:54:00 2020 +0800 [HUDI-679] Make io package Spark free (#1460) * [HUDI-679] Make io package Spark free --- .../scala/org/apache/hudi/cli/SparkHelpers.scala | 3 +- .../hudi/client/SparkTaskContextSupplier.java | 42 ++++++++++++++++++++++ .../hudi/execution/BulkInsertMapFunction.java | 2 +- .../execution/CopyOnWriteLazyInsertIterable.java | 10 ++++-- .../execution/MergeOnReadLazyInsertIterable.java | 9 ++--- .../org/apache/hudi/io/HoodieAppendHandle.java | 14 ++++---- .../org/apache/hudi/io/HoodieCreateHandle.java | 14 ++++---- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 14 ++++---- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 25 +++++++++---- .../hudi/io/storage/HoodieParquetWriter.java | 10 +++--- .../io/storage/HoodieStorageWriterFactory.java | 13 +++---- .../apache/hudi/table/HoodieCopyOnWriteTable.java | 8 ++--- .../apache/hudi/table/HoodieMergeOnReadTable.java | 4 +-- .../java/org/apache/hudi/table/HoodieTable.java | 7 ++++ .../hudi/client/TestUpdateSchemaEvolution.java | 4 +-- .../hudi/common/HoodieClientTestHarness.java | 3 ++ .../apache/hudi/common/HoodieClientTestUtils.java | 4 ++- .../io/storage/TestHoodieStorageWriterFactory.java | 6 ++-- .../apache/hudi/table/TestCopyOnWriteTable.java | 2 +- 19 files changed, 136 insertions(+), 58 deletions(-) diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 4c8e4c1..6fdac1c 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport +import org.apache.hudi.client.SparkTaskContextSupplier import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.HoodieRecord @@ -45,7 +46,7 @@ object SparkHelpers { HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) - val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema) + val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier()) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java new file mode 100644 index 0000000..601dd98 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.spark.TaskContext; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Spark task context supplier. + */ +public class SparkTaskContextSupplier implements Serializable { + + public Supplier<Integer> getPartitionIdSupplier() { + return () -> TaskContext.getPartitionId(); + } + + public Supplier<Integer> getStageIdSupplier() { + return () -> TaskContext.get().stageId(); + } + + public Supplier<Long> getAttemptIdSupplier() { + return () -> TaskContext.get().taskAttemptId(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java index 249ff3d..5d4391c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java @@ -51,6 +51,6 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> @Override public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) { return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition)); + fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java index bdcea61..8f98496 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.model.HoodieRecord; @@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> protected final HoodieTable<T> hoodieTable; protected final String idPrefix; protected int numFilesWritten; + protected SparkTaskContextSupplier sparkTaskContextSupplier; public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable<T> hoodieTable, String idPrefix) { + String instantTime, HoodieTable<T> hoodieTable, String idPrefix, + SparkTaskContextSupplier sparkTaskContextSupplier) { super(sortedRecordItr); this.hoodieConfig = config; this.instantTime = instantTime; this.hoodieTable = hoodieTable; this.idPrefix = idPrefix; this.numFilesWritten = 0; + this.sparkTaskContextSupplier = sparkTaskContextSupplier; } // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. @@ -137,7 +141,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> // lazily initialize the handle, for the first time if (handle == null) { handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix)); + getNextFileId(idPrefix), sparkTaskContextSupplier); } if (handle.canWrite(payload.record)) { @@ -148,7 +152,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> statuses.add(handle.close()); // Need to handle the rejected payload & open new handle handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - getNextFileId(idPrefix)); + getNextFileId(idPrefix), sparkTaskContextSupplier); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java index 11c0035..02a9ead 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -35,8 +36,8 @@ import java.util.List; public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> { public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable<T> hoodieTable, String idPfx) { - super(sortedRecordItr, config, instantTime, hoodieTable, idPfx); + String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier); } @Override @@ -53,7 +54,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend // lazily initialize the handle, for the first time if (handle == null) { handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix)); + insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); } if (handle.canWrite(insertPayload)) { // write the payload, if the handle has capacity @@ -64,7 +65,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend statuses.add(handle.getWriteStatus()); // Need to handle the rejected payload & open new handle handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable, - insertPayload.getPartitionPath(), getNextFileId(idPrefix)); + insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index f1bd57c..0c08734 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieDeltaWriteStat; @@ -49,7 +50,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import org.apache.spark.util.SizeEstimator; import java.io.IOException; @@ -101,16 +101,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri private long insertRecordsWritten = 0; public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); writeStatus.setStat(new HoodieDeltaWriteStat()); this.fileId = fileId; this.recordItr = recordItr; } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId) { - this(config, instantTime, hoodieTable, partitionPath, fileId, null); + String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier); } private void init(HoodieRecord record) { @@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri //save hoodie partition meta in the partition path HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); this.writer = createLogWriter(fileSlice, baseInstantTime); this.currentLogFile = writer.getLogFile(); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); @@ -163,7 +163,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get())); String seqId = - HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); + HoodieRecord.generateSequenceId(instantTime, getPartitionId(), recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 1ab22e0..dd8bdac 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -38,7 +39,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.Iterator; @@ -56,8 +56,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri private boolean useWriterSchema = false; public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); @@ -66,10 +66,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); createMarkerFile(partitionPath); this.storageWriter = - HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema); + HoodieStorageWriterFactory.getStorageWriter(instantTime, path, hoodieTable, config, writerSchema, this.sparkTaskContextSupplier); } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } @@ -80,8 +80,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri * Called by the compactor code path. */ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) { - this(config, instantTime, hoodieTable, partitionPath, fileId); + String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, sparkTaskContextSupplier); this.recordIterator = recordIterator; this.useWriterSchema = true; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 3a81340..5b95cd0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -46,7 +47,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.HashSet; @@ -71,8 +71,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit private boolean useWriterSchema; public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); init(fileId, recordItr); init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); } @@ -82,8 +82,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit */ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId, - HoodieBaseFile dataFileToBeMerged) { - super(config, instantTime, partitionPath, fileId, hoodieTable); + HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier sparkTaskContextSupplier) { + super(config, instantTime, partitionPath, fileId, hoodieTable, sparkTaskContextSupplier); this.keyToNewRecords = keyToNewRecords; this.useWriterSchema = true; init(fileId, this.partitionPath, dataFileToBeMerged); @@ -111,7 +111,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); - partitionMetadata.trySave(TaskContext.getPartitionId()); + partitionMetadata.trySave(getPartitionId()); oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") @@ -132,7 +132,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit // Create the writer for writing the new version file storageWriter = - HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); + HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier); } catch (IOException io) { LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 336e508..dd67a6a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -38,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.TaskContext; import java.io.IOException; @@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H protected final String partitionPath; protected final String fileId; protected final String writeToken; + protected final SparkTaskContextSupplier sparkTaskContextSupplier; public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, - String fileId, HoodieTable<T> hoodieTable) { + String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) { super(config, instantTime, hoodieTable); this.partitionPath = partitionPath; this.fileId = fileId; - this.writeToken = makeSparkWriteToken(); this.originalSchema = new Schema.Parser().parse(config.getSchema()); this.writerSchema = createHoodieWriteSchema(originalSchema); this.timer = new HoodieTimer().startTimer(); this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); + this.sparkTaskContextSupplier = sparkTaskContextSupplier; + this.writeToken = makeWriteToken(); } /** * Generate a write token based on the currently running spark task and its place in the spark dag. */ - private static String makeSparkWriteToken() { - return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), - TaskContext.get().taskAttemptId()); + private String makeWriteToken() { + return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId()); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -171,4 +172,16 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H protected FileSystem getFileSystem() { return hoodieTable.getMetaClient().getFs(); } + + protected int getPartitionId() { + return sparkTaskContextSupplier.getPartitionIdSupplier().get(); + } + + protected int getStageId() { + return sparkTaskContextSupplier.getStageIdSupplier().get(); + } + + protected long getAttemptId() { + return sparkTaskContextSupplier.getAttemptIdSupplier().get(); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java index ad8987a..473a806 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.spark.TaskContext; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; @@ -52,9 +52,10 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe private final HoodieAvroWriteSupport writeSupport; private final String instantTime; private final Schema schema; + private final SparkTaskContextSupplier sparkTaskContextSupplier; - public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig, Schema schema) - throws IOException { + public HoodieParquetWriter(String instantTime, Path file, HoodieParquetConfig parquetConfig, + Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(), @@ -72,6 +73,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe this.writeSupport = parquetConfig.getWriteSupport(); this.instantTime = instantTime; this.schema = schema; + this.sparkTaskContextSupplier = sparkTaskContextSupplier; } public static Configuration registerFileSystem(Path file, Configuration conf) { @@ -85,7 +87,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe @Override public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { String seqId = - HoodieRecord.generateSequenceId(instantTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); + HoodieRecord.generateSequenceId(instantTime, sparkTaskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), file.getName()); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java index 538c2ca..09f8ba9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.bloom.filter.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilterFactory; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -39,19 +40,19 @@ import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; public class HoodieStorageWriterFactory { public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter( - String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema) - throws IOException { + String instantTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema, + SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { final String name = path.getName(); final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name); if (PARQUET.getFileExtension().equals(extension)) { - return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable); + return newParquetStorageWriter(instantTime, path, config, schema, hoodieTable, sparkTaskContextSupplier); } throw new UnsupportedOperationException(extension + " format not supported yet."); } private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter( - String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) - throws IOException { + String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, + SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException { BloomFilter filter = BloomFilterFactory .createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), config.getDynamicBloomFilterMaxNumEntries(), @@ -63,6 +64,6 @@ public class HoodieStorageWriterFactory { config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(), config.getParquetCompressionRatio()); - return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema); + return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, sparkTaskContextSupplier); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 0b16efe..250bc51 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -222,13 +222,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) { - return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId); + return new HoodieMergeHandle<>(config, instantTime, this, recordItr, partitionPath, fileId, sparkTaskContextSupplier); } protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, - partitionPath, fileId, dataFileToBeMerged); + partitionPath, fileId, dataFileToBeMerged, sparkTaskContextSupplier); } public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) @@ -238,13 +238,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi LOG.info("Empty partition"); return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); } - return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx); + return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier); } public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) { HoodieCreateHandle createHandle = - new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr); + new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier); createHandle.write(); return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index a7c5a68..4690382 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -108,7 +108,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi return super.handleUpdate(instantTime, partitionPath, fileId, recordItr); } else { HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, instantTime, this, - partitionPath, fileId, recordItr); + partitionPath, fileId, recordItr, sparkTaskContextSupplier); appendHandle.doAppend(); appendHandle.close(); return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator(); @@ -120,7 +120,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi throws Exception { // If canIndexLogFiles, write inserts to log files else write inserts to parquet files if (index.canIndexLogFiles()) { - return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx); + return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier); } else { return super.handleInsert(instantTime, idPfx, recordItr); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index e38510f..a05e28f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -84,6 +85,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri private SerializableConfiguration hadoopConfiguration; private transient FileSystemViewManager viewManager; + protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier(); + protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) { this.config = config; this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration()); @@ -448,4 +451,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) { return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()); } + + public SparkTaskContextSupplier getSparkTaskContextSupplier() { + return sparkTaskContextSupplier; + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 8c949fb..19919c7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -89,7 +89,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); HoodieCreateHandle createHandle = - new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator()); + new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier); createHandle.write(); return createHandle.close(); }).collect(); @@ -119,7 +119,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { try { HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, - updateRecords.iterator(), record1.getPartitionPath(), fileId); + updateRecords.iterator(), record1.getPartitionPath(), fileId, supplier); Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema()); List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf, diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 4e5721f..c5c7d82 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -17,6 +17,7 @@ package org.apache.hudi.common; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieTestUtils; @@ -55,6 +56,8 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient HoodieTableMetaClient metaClient; private static AtomicInteger instantGen = new AtomicInteger(1); + protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); + public String getNextInstant() { return String.format("%09d", instantGen.getAndIncrement()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java index b85de2c..b4601c2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common; import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.filter.BloomFilter; @@ -230,7 +231,8 @@ public class HoodieClientTestUtils { ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); HoodieParquetWriter writer = - new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, schema); + new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, + schema, new SparkTaskContextSupplier()); int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java index 6758377..a1492dd 100755 --- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; @@ -44,15 +45,16 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase { final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"); final HoodieWriteConfig cfg = getConfig(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, - parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA); + parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter); // other file format exception. final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); try { HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath, - table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA); + table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); fail("should fail since log storage writer is not supported yet."); } catch (Exception e) { Assert.assertTrue(e instanceof UnsupportedOperationException); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index c88233d..9ff0f97 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -103,7 +103,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { when(record.getPartitionPath()).thenReturn(partitionPath); String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); - HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName); + HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName, supplier); return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken); }).collect().get(0);