This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new a0d7ab2 HUDI-70 : Making DeltaStreamer run in continuous mode with concurrent compaction a0d7ab2 is described below commit a0d7ab238473f22347e140b0e1e273ab80583eb7 Author: Balaji Varadarajan <varad...@uber.com> AuthorDate: Wed May 15 13:21:55 2019 -0700 HUDI-70 : Making DeltaStreamer run in continuous mode with concurrent compaction --- .../java/com/uber/hoodie/AbstractHoodieClient.java | 45 +- .../com/uber/hoodie/CompactionAdminClient.java | 6 + .../java/com/uber/hoodie/HoodieReadClient.java | 27 +- .../java/com/uber/hoodie/HoodieWriteClient.java | 14 +- .../client/embedded/EmbeddedTimelineService.java | 4 + .../hoodie/common/HoodieTestDataGenerator.java | 99 ++- .../uber/hoodie/table/TestMergeOnReadTable.java | 1 + .../hoodie/common/table/HoodieTableMetaClient.java | 2 +- .../table/view/RocksDbBasedFileSystemView.java | 4 +- .../uber/hoodie/common/util/CompactionUtils.java | 8 + .../com/uber/hoodie/common/util/RocksDBDAO.java | 91 ++- .../common/util/collection/DiskBasedMap.java | 3 +- .../common/util/collection/RocksDBBasedMap.java | 125 ++++ .../hoodie/common/util/TestRocksDBManager.java | 2 +- .../util/collection/TestRocksDbBasedMap.java | 56 ++ .../main/java/com/uber/hoodie/DataSourceUtils.java | 12 +- .../com/uber/hoodie/HoodieSparkSqlWriter.scala | 33 +- hoodie-spark/src/test/scala/DataSourceTest.scala | 3 +- hoodie-utilities/pom.xml | 17 +- .../com/uber/hoodie/utilities/UtilHelpers.java | 17 +- .../AbstractDeltaStreamerService.java | 146 +++++ .../hoodie/utilities/deltastreamer/Compactor.java | 62 ++ .../{HoodieDeltaStreamer.java => DeltaSync.java} | 438 +++++++------ .../deltastreamer/HoodieDeltaStreamer.java | 674 +++++++++++---------- .../deltastreamer/SchedulerConfGenerator.java | 94 +++ .../hoodie/utilities/TestHoodieDeltaStreamer.java | 106 ++++ .../utilities/sources/AbstractBaseTestSource.java | 103 ++++ .../sources/DistributedTestDataSource.java | 79 +++ .../hoodie/utilities/sources/TestDataSource.java | 46 +- .../utilities/sources/config/TestSourceConfig.java | 43 ++ packaging/hoodie-utilities-bundle/pom.xml | 5 + pom.xml | 5 + 32 files changed, 1709 insertions(+), 661 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java index 243ecf5..5aa051c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import java.io.IOException; import java.io.Serializable; +import java.util.Optional; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -47,13 +48,21 @@ public abstract class AbstractHoodieClient implements Serializable { * of the cached file-system view. New completed actions will be synced automatically * in an incremental fashion. */ - private transient EmbeddedTimelineService timelineServer; + private transient Optional<EmbeddedTimelineService> timelineServer; + private final boolean shouldStopTimelineServer; protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { + this(jsc, clientConfig, Optional.empty()); + } + + protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, + Optional<EmbeddedTimelineService> timelineServer) { this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration()); this.jsc = jsc; this.basePath = clientConfig.getBasePath(); this.config = clientConfig; + this.timelineServer = timelineServer; + shouldStopTimelineServer = !timelineServer.isPresent(); startEmbeddedServerView(); } @@ -65,28 +74,30 @@ public abstract class AbstractHoodieClient implements Serializable { } private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) { - if (timelineServer != null) { + if (timelineServer.isPresent() && shouldStopTimelineServer) { + // Stop only if owner logger.info("Stopping Timeline service !!"); - timelineServer.stop(); - timelineServer = null; - // Reset Storage Config to Client specified config - if (resetViewStorageConfig) { - config.resetViewStorageConfig(); - } + timelineServer.get().stop(); + } + + timelineServer = Optional.empty(); + // Reset Storage Config to Client specified config + if (resetViewStorageConfig) { + config.resetViewStorageConfig(); } } private synchronized void startEmbeddedServerView() { if (config.isEmbeddedTimelineServerEnabled()) { - if (timelineServer == null) { + if (!timelineServer.isPresent()) { // Run Embedded Timeline Server logger.info("Starting Timeline service !!"); - timelineServer = new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(), - config.getClientSpecifiedViewStorageConfig()); + timelineServer = Optional.of(new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(), + config.getClientSpecifiedViewStorageConfig())); try { - timelineServer.startServer(); + timelineServer.get().startServer(); // Allow executor to find this newly instantiated timeline service - config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig()); + config.setViewStorageConfig(timelineServer.get().getRemoteFileSystemViewConfig()); } catch (IOException e) { logger.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e); stopEmbeddedServerView(false); @@ -98,4 +109,12 @@ public abstract class AbstractHoodieClient implements Serializable { logger.info("Embedded Timeline Server is disabled. Not starting timeline service"); } } + + public HoodieWriteConfig getConfig() { + return config; + } + + public Optional<EmbeddedTimelineService> getTimelineServer() { + return timelineServer; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java index 751e7dc..d2b007f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.client.embedded.EmbeddedTimelineService; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; @@ -68,6 +69,11 @@ public class CompactionAdminClient extends AbstractHoodieClient { super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build()); } + public CompactionAdminClient(JavaSparkContext jsc, String basePath, + java.util.Optional<EmbeddedTimelineService> timelineServer) { + super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), timelineServer); + } + /** * Validate all compaction operations in a compaction plan. Verifies the file-slices are consistent with corresponding * compaction operations. diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index a1e7ab7..86c2fa5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -20,6 +20,7 @@ package com.uber.hoodie; import com.google.common.base.Optional; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.client.embedded.EmbeddedTimelineService; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -69,12 +70,20 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo /** * @param basePath path to Hoodie dataset */ - public HoodieReadClient(JavaSparkContext jsc, String basePath) { + public HoodieReadClient(JavaSparkContext jsc, String basePath, + java.util.Optional<EmbeddedTimelineService> timelineService) { this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath) // by default we use HoodieBloomIndex .withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .build()); + .build(), timelineService); + } + + /** + * @param basePath path to Hoodie dataset + */ + public HoodieReadClient(JavaSparkContext jsc, String basePath) { + this(jsc, basePath, java.util.Optional.empty()); } /** @@ -91,13 +100,19 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo * @param clientConfig instance of HoodieWriteConfig */ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { - super(jsc, clientConfig); + this(jsc, clientConfig, java.util.Optional.empty()); + } + + /** + * @param clientConfig instance of HoodieWriteConfig + */ + public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, + java.util.Optional<EmbeddedTimelineService> timelineService) { + super(jsc, clientConfig, timelineService); final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - this.hoodieTable = HoodieTable - .getHoodieTable(metaClient, - clientConfig, jsc); + this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc); this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); this.index = HoodieIndex.createIndex(clientConfig, jsc); this.sqlContextOpt = Optional.absent(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 51776be..a9b548b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -27,6 +27,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieRestoreMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.client.embedded.EmbeddedTimelineService; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; @@ -75,6 +76,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -124,7 +126,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo @VisibleForTesting HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight, HoodieIndex index) { - super(jsc, clientConfig); + this(jsc, clientConfig, rollbackInFlight, index, Optional.empty()); + } + + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, + boolean rollbackInFlight, HoodieIndex index, Optional<EmbeddedTimelineService> timelineService) { + super(jsc, clientConfig, timelineService); this.index = index; this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackInFlight = rollbackInFlight; @@ -1184,7 +1191,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + new HoodieTableMetaClient( + // Clone Configuration here. Otherwise we could see ConcurrentModificationException (race) in multi-threaded + // execution (HoodieDeltaStreamer) when Configuration gets serialized by Spark. + new Configuration(jsc.hadoopConfiguration()), config.getBasePath(), true), config, jsc); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java b/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java index 91a925e..fbd54c9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java @@ -91,6 +91,10 @@ public class EmbeddedTimelineService { .withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build(); } + public FileSystemViewManager getViewManager() { + return viewManager; + } + public void stop() { if (null != server) { this.server.close(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index f50baaf..1f79bb8 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -31,16 +31,23 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -83,17 +90,23 @@ public class HoodieTestDataGenerator { private static Random rand = new Random(46474747); - private List<KeyPartition> existingKeysList = new ArrayList<>(); - private String[] partitionPaths; + private final Map<Integer, KeyPartition> existingKeys; + private final String[] partitionPaths; + private int numExistingKeys; public HoodieTestDataGenerator(String[] partitionPaths) { - this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this(partitionPaths, new HashMap<>()); } public HoodieTestDataGenerator() { this(DEFAULT_PARTITION_PATHS); } + public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) { + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeys = keyPartitionMap; + } + public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { for (String partitionPath : partitionPaths) { new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0); @@ -193,19 +206,29 @@ public class HoodieTestDataGenerator { * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. */ public List<HoodieRecord> generateInserts(String commitTime, Integer n) throws IOException { - List<HoodieRecord> inserts = new ArrayList<>(); - for (int i = 0; i < n; i++) { + return generateInsertsStream(commitTime, n).collect(Collectors.toList()); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); - HoodieRecord record = new HoodieRecord(key, generateRandomValue(key, commitTime)); - inserts.add(record); - KeyPartition kp = new KeyPartition(); kp.key = key; kp.partitionPath = partitionPath; - existingKeysList.add(kp); - } - return inserts; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + try { + return new HoodieRecord(key, generateRandomValue(key, commitTime)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); } public List<HoodieRecord> generateSameKeyInserts(String commitTime, List<HoodieRecord> origin) throws IOException { @@ -221,6 +244,7 @@ public class HoodieTestDataGenerator { public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String commitTime, int limit) throws IOException { List<HoodieRecord> inserts = new ArrayList<>(); + int currSize = getNumExistingKeys(); for (int i = 0; i < limit; i++) { String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); @@ -230,7 +254,8 @@ public class HoodieTestDataGenerator { KeyPartition kp = new KeyPartition(); kp.key = key; kp.partitionPath = partitionPath; - existingKeysList.add(kp); + existingKeys.put(currSize + i, kp); + numExistingKeys++; } return inserts; } @@ -293,7 +318,7 @@ public class HoodieTestDataGenerator { public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException { List<HoodieRecord> updates = new ArrayList<>(); for (int i = 0; i < n; i++) { - KeyPartition kp = existingKeysList.get(rand.nextInt(existingKeysList.size() - 1)); + KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1)); HoodieRecord record = generateUpdateRecord(kp.key, commitTime); updates.add(record); } @@ -307,39 +332,55 @@ public class HoodieTestDataGenerator { * @param n Number of unique records * @return list of hoodie record updates */ - public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) throws IOException { - List<HoodieRecord> updates = new ArrayList<>(); - Set<KeyPartition> used = new HashSet<>(); + public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) { + return generateUniqueUpdatesStream(commitTime, n).collect(Collectors.toList()); + } + + /** + * Generates deduped updates of keys previously inserted, randomly distributed across the keys above. + * + * @param commitTime Commit Timestamp + * @param n Number of unique records + * @return stream of hoodie record updates + */ + public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) { + final Set<KeyPartition> used = new HashSet<>(); - if (n > existingKeysList.size()) { + if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); } - for (int i = 0; i < n; i++) { - int index = rand.nextInt(existingKeysList.size() - 1); - KeyPartition kp = existingKeysList.get(index); + return IntStream.range(0, n).boxed().map(i -> { + int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1); + KeyPartition kp = existingKeys.get(index); // Find the available keyPartition starting from randomly chosen one. while (used.contains(kp)) { - index = (index + 1) % existingKeysList.size(); - kp = existingKeysList.get(index); + index = (index + 1) % numExistingKeys; + kp = existingKeys.get(index); } - HoodieRecord record = new HoodieRecord(kp.key, generateRandomValue(kp.key, commitTime)); - updates.add(record); used.add(kp); - } - return updates; + try { + return new HoodieRecord(kp.key, generateRandomValue(kp.key, commitTime)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); } public String[] getPartitionPaths() { return partitionPaths; } - public List<KeyPartition> getExistingKeysList() { - return existingKeysList; + public int getNumExistingKeys() { + return numExistingKeys; } - public static class KeyPartition { + public static class KeyPartition implements Serializable { HoodieKey key; String partitionPath; } + + public void close() { + existingKeys.clear(); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 5f22de0..2a663c3 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -905,6 +905,7 @@ public class TestMergeOnReadTable { .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)) .count() > 0); } + writeClient.close(); } @Test diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index c8dd7d2..7551748 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -85,7 +85,7 @@ public class HoodieTableMetaClient implements Serializable { throws DatasetNotFoundException { log.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; - this.hadoopConf = new SerializableConfiguration(conf); + this.hadoopConf = new SerializableConfiguration(new Configuration(conf)); Path basePathDir = new Path(this.basePath); this.metaPath = basePath + File.separator + METAFOLDER_NAME; Path metaPathDir = new Path(this.metaPath); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java index dc371bc..9edbb94 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java @@ -74,7 +74,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste super(config.isIncrementalTimelineSyncEnabled()); this.config = config; this.schemaHelper = new RocksDBSchemaHelper(metaClient); - this.rocksDB = new RocksDBDAO(metaClient.getBasePath(), config); + this.rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); init(metaClient, visibleActiveTimeline); } @@ -138,7 +138,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste protected void resetViewState() { log.info("Deleting all rocksdb data associated with dataset filesystem view"); rocksDB.close(); - rocksDB = new RocksDBDAO(metaClient.getBasePath(), config); + rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); } @Override diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java index cc188a7..e0927ce 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -174,4 +174,12 @@ public class CompactionUtils { return Stream.empty(); } } + + /** + * Return all pending compaction instant times + * @return + */ + public static List<HoodieInstant> getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) { + return metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java index 5b7d9cc..739c778 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java @@ -20,7 +20,6 @@ package com.uber.hoodie.common.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig; import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; @@ -57,7 +56,6 @@ public class RocksDBDAO { protected static final transient Logger log = LogManager.getLogger(RocksDBDAO.class); - private final FileSystemViewStorageConfig config; private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap; private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap; private transient RocksDB rocksDB; @@ -65,10 +63,9 @@ public class RocksDBDAO { private final String basePath; private final String rocksDBBasePath; - public RocksDBDAO(String basePath, FileSystemViewStorageConfig config) { + public RocksDBDAO(String basePath, String rocksDBBasePath) { this.basePath = basePath; - this.config = config; - this.rocksDBBasePath = String.format("%s/%s/%s", config.getRocksdbBasePath(), + this.rocksDBBasePath = String.format("%s/%s/%s", rocksDBBasePath, this.basePath.replace("/", "_"), UUID.randomUUID().toString()); init(); } @@ -95,6 +92,7 @@ public class RocksDBDAO { managedDescriptorMap = new ConcurrentHashMap<>(); // If already present, loads the existing column-family handles + final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true) .setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics()); dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) { @@ -184,6 +182,26 @@ public class RocksDBDAO { } /** + * Helper to add put operation in batch + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + * @param <T> Type of payload + */ + public <K extends Serializable, T extends Serializable> void putInBatch(WriteBatch batch, String columnFamilyName, + K key, T value) { + try { + byte[] keyBytes = SerializationUtils.serialize(key); + byte[] payload = SerializationUtils.serialize(value); + batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + /** * Perform single PUT on a column-family * * @param columnFamilyName Column family name @@ -201,6 +219,23 @@ public class RocksDBDAO { } /** + * Perform single PUT on a column-family + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + * @param <T> Type of Payload + */ + public <K extends Serializable, T extends Serializable> void put(String columnFamilyName, K key, T value) { + try { + byte[] payload = SerializationUtils.serialize(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key), payload); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + /** * Helper to add delete operation in batch * * @param batch Batch Handle @@ -216,6 +251,21 @@ public class RocksDBDAO { } /** + * Helper to add delete operation in batch + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + */ + public <K extends Serializable> void deleteInBatch(WriteBatch batch, String columnFamilyName, K key) { + try { + batch.delete(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key)); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + /** * Perform a single Delete operation * * @param columnFamilyName Column Family name @@ -230,6 +280,20 @@ public class RocksDBDAO { } /** + * Perform a single Delete operation + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public <K extends Serializable> void delete(String columnFamilyName, K key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key)); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + /** * Retrieve a value for a given key in a column family * * @param columnFamilyName Column Family Name @@ -247,6 +311,23 @@ public class RocksDBDAO { } /** + * Retrieve a value for a given key in a column family + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + * @param <T> Type of object stored. + */ + public <K extends Serializable, T extends Serializable> T get(String columnFamilyName, K key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key)); + return val == null ? null : SerializationUtils.deserialize(val); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + /** * Perform a prefix search and return stream of key-value pairs retrieved * * @param columnFamilyName Column Family Name diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 34e2276..6b617a0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -51,7 +51,7 @@ import org.apache.log4j.Logger; * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. * 2) Current position in the file NOTE : Only String.class type supported for Key */ -public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R> { +public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> { private static final Logger log = LogManager.getLogger(DiskBasedMap.class); // Stores the key and corresponding value's latest metadata spilled to disk @@ -149,6 +149,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable> /** * Custom iterator to iterate over values written to disk */ + @Override public Iterator<R> iterator() { return new LazyFileIterable(filePath, valueMetadataMap).iterator(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java new file mode 100644 index 0000000..267723f --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.util.RocksDBDAO; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +public final class RocksDBBasedMap<K extends Serializable, R extends Serializable> implements Map<K, R> { + + private static final String COL_FAMILY_NAME = "map_handle"; + + private final String rocksDbStoragePath; + private RocksDBDAO rocksDBDAO; + private final String columnFamilyName; + + public RocksDBBasedMap(String rocksDbStoragePath) { + this.rocksDbStoragePath = rocksDbStoragePath; + this.columnFamilyName = COL_FAMILY_NAME; + } + + @Override + public int size() { + return (int)getRocksDBDAO().prefixSearch(columnFamilyName, "").count(); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean containsKey(Object key) { + // Wont be able to store nulls as values + return getRocksDBDAO().get(columnFamilyName, key.toString()) != null; + } + + @Override + public boolean containsValue(Object value) { + throw new HoodieNotSupportedException("Not Supported"); + } + + @Override + public R get(Object key) { + return getRocksDBDAO().get(columnFamilyName, (Serializable)key); + } + + @Override + public R put(K key, R value) { + getRocksDBDAO().put(columnFamilyName, key, value); + return value; + } + + @Override + public R remove(Object key) { + R val = getRocksDBDAO().get(columnFamilyName, key.toString()); + getRocksDBDAO().delete(columnFamilyName, key.toString()); + return val; + } + + @Override + public void putAll(Map<? extends K, ? extends R> m) { + getRocksDBDAO().writeBatch(batch -> { + m.entrySet().forEach(entry -> { + getRocksDBDAO().putInBatch(batch, columnFamilyName, entry.getKey(), entry.getValue()); + }); + }); + } + + private RocksDBDAO getRocksDBDAO() { + if (null == rocksDBDAO) { + rocksDBDAO = new RocksDBDAO("default", rocksDbStoragePath); + rocksDBDAO.addColumnFamily(columnFamilyName); + } + return rocksDBDAO; + } + + @Override + public void clear() { + if (null != rocksDBDAO) { + rocksDBDAO.close(); + } + rocksDBDAO = null; + } + + @Override + public Set<K> keySet() { + throw new HoodieNotSupportedException("Not Supported"); + } + + @Override + public Collection<R> values() { + throw new HoodieNotSupportedException("Not Supported"); + } + + @Override + public Set<Entry<K, R>> entrySet() { + throw new HoodieNotSupportedException("Not Supported"); + } + + public Iterator<R> iterator() { + return getRocksDBDAO().prefixSearch(columnFamilyName, "") + .map(p -> (R)(p.getValue())).iterator(); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java index 0c7bf31..dd239e1 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java @@ -67,7 +67,7 @@ public class TestRocksDBManager { }).collect(Collectors.toList()); dbManager = new RocksDBDAO("/dummy/path", - FileSystemViewStorageConfig.newBuilder().build().newBuilder().build()); + FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath()); colFamilies.stream().forEach(family -> dbManager.dropColumnFamily(family)); colFamilies.stream().forEach(family -> dbManager.addColumnFamily(family)); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java new file mode 100644 index 0000000..e91625d --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util.collection; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.common.util.SpillableMapTestUtils; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.junit.Assert; +import org.junit.Test; + +public class TestRocksDbBasedMap { + + private static final String BASE_OUTPUT_PATH = "/tmp/"; + + @Test + public void testSimple() throws IOException, URISyntaxException { + RocksDBBasedMap records = new RocksDBBasedMap(BASE_OUTPUT_PATH); + List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); + ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); + + // make sure records have spilled to disk + Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator(); + List<HoodieRecord> oRecords = new ArrayList<>(); + while (itr.hasNext()) { + HoodieRecord<? extends HoodieRecordPayload> rec = itr.next(); + oRecords.add(rec); + assert recordKeys.contains(rec.getRecordKey()); + } + Assert.assertEquals(recordKeys.size(), oRecords.size()); + } +} diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 3401266..e7b9494 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -18,6 +18,7 @@ package com.uber.hoodie; +import com.uber.hoodie.client.embedded.EmbeddedTimelineService; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; @@ -182,10 +184,10 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, - HoodieWriteConfig writeConfig) throws Exception { + HoodieWriteConfig writeConfig, Optional<EmbeddedTimelineService> timelineService) throws Exception { HoodieReadClient client = null; try { - client = new HoodieReadClient<>(jssc, writeConfig); + client = new HoodieReadClient<>(jssc, writeConfig, timelineService); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown()); } catch (DatasetNotFoundException e) { @@ -202,12 +204,14 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, - Map<String, String> parameters) throws Exception { + Map<String, String> parameters, + Optional<EmbeddedTimelineService> timelineService) + throws Exception { HoodieWriteConfig writeConfig = HoodieWriteConfig .newBuilder() .withPath(parameters.get("path")) .withProps(parameters).build(); - return dropDuplicates(jssc, incomingHoodieRecords, writeConfig); + return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService); } public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) { diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index e4f6fc3..35c19aa 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -98,21 +98,6 @@ private[hoodie] object HoodieSparkSqlWriter { val jsc = new JavaSparkContext(sparkContext) - val hoodieRecords = - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates( - jsc, - hoodieAllIncomingRecords, - mapAsJavaMap(parameters)) - } else { - hoodieAllIncomingRecords - } - - if (hoodieRecords.isEmpty()) { - log.info("new batch has no new records, skipping...") - return (true, None) - } - val basePath = new Path(parameters("path")) val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(basePath) @@ -141,6 +126,22 @@ private[hoodie] object HoodieSparkSqlWriter { val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, mapAsJavaMap(parameters) ) + + val hoodieRecords = + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + DataSourceUtils.dropDuplicates( + jsc, + hoodieAllIncomingRecords, + mapAsJavaMap(parameters), client.getTimelineServer) + } else { + hoodieAllIncomingRecords + } + + if (hoodieRecords.isEmpty()) { + log.info("new batch has no new records, skipping...") + return (true, None) + } + val commitTime = client.startCommit() val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index 6a93498..66c0ac7 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -236,8 +236,9 @@ class DataSourceTest extends AssertionsForJUnit { inputDF2.write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch - Thread.sleep(3000) + Thread.sleep(10000) val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // Read RO View val hoodieROViewDF2 = spark.read.format("com.uber.hoodie") diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index 4179837..2625cc9 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -44,6 +44,18 @@ <target>1.8</target> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.1.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> <resources> @@ -218,10 +230,13 @@ <artifactId>commons-dbcp</artifactId> </dependency> <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> </dependency> - <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index 730777c..9e6204e 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -37,7 +37,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.StringReader; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -132,7 +134,11 @@ public class UtilHelpers { } private static SparkConf buildSparkConf(String appName, String defaultMaster) { - SparkConf sparkConf = new SparkConf().setAppName(appName); + return buildSparkConf(appName, defaultMaster, new HashMap<>()); + } + + private static SparkConf buildSparkConf(String appName, String defaultMaster, Map<String, String> additionalConfigs) { + final SparkConf sparkConf = new SparkConf().setAppName(appName); String master = sparkConf.get("spark.master", defaultMaster); sparkConf.setMaster(master); if (master.startsWith("yarn")) { @@ -147,8 +153,13 @@ public class UtilHelpers { "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - sparkConf = HoodieWriteClient.registerClasses(sparkConf); - return sparkConf; + additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue())); + SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf); + return newSparkConf; + } + + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) { + return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); } public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java new file mode 100644 index 0000000..179fdbd --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import com.uber.hoodie.common.util.collection.Pair; +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle + */ +public abstract class AbstractDeltaStreamerService implements Serializable { + + protected static volatile Logger log = LogManager.getLogger(AbstractDeltaStreamerService.class); + + // Flag to track if the service is started. + private boolean started; + // Flag indicating shutdown is externally requested + private boolean shutdownRequested; + // Flag indicating the service is shutdown + private volatile boolean shutdown; + // Executor Service for running delta-sync/compaction + private transient ExecutorService executor; + // Future tracking delta-sync/compaction + private transient CompletableFuture future; + + AbstractDeltaStreamerService() { + shutdownRequested = false; + } + + boolean isShutdownRequested() { + return shutdownRequested; + } + + boolean isShutdown() { + return shutdown; + } + + /** + * Wait till the service shutdown. If the service shutdown with exception, it will be thrown + * @throws ExecutionException + * @throws InterruptedException + */ + void waitForShutdown() throws ExecutionException, InterruptedException { + try { + future.get(); + } catch (ExecutionException ex) { + log.error("Service shutdown with error", ex); + throw ex; + } + } + + /** + * Request shutdown either forcefully or gracefully. Graceful shutdown allows the service to finish up the current + * round of work and shutdown. For graceful shutdown, it waits till the service is shutdown + * @param force Forcefully shutdown + */ + void shutdown(boolean force) { + if (!shutdownRequested || force) { + shutdownRequested = true; + if (executor != null) { + if (force) { + executor.shutdownNow(); + } else { + executor.shutdown(); + try { + // Wait for some max time after requesting shutdown + executor.awaitTermination(24, TimeUnit.HOURS); + } catch (InterruptedException ie) { + log.error("Interrupted while waiting for shutdown", ie); + } + } + } + } + } + + /** + * Start the service. Runs the service in a different thread and returns. Also starts a monitor thread + * to run-callbacks in case of shutdown + * @param onShutdownCallback + */ + public void start(Function<Boolean, Boolean> onShutdownCallback) { + Pair<CompletableFuture, ExecutorService> res = startService(); + future = res.getKey(); + executor = res.getValue(); + started = true; + monitorThreads(onShutdownCallback); + } + + /** + * Service implementation + * @return + */ + protected abstract Pair<CompletableFuture, ExecutorService> startService(); + + /** + * A monitor thread is started which would trigger a callback if the service is shutdown + * @param onShutdownCallback + */ + private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) { + log.info("Submitting monitor thread !!"); + Executors.newSingleThreadExecutor().submit(() -> { + boolean error = false; + try { + log.info("Monitoring thread(s) !!"); + future.get(); + } catch (ExecutionException ex) { + log.error("Monitor noticed one or more threads failed." + + " Requesting graceful shutdown of other threads", ex); + error = true; + shutdown(false); + } catch (InterruptedException ie) { + log.error("Got interrupted Monitoring threads", ie); + error = true; + shutdown(false); + } finally { + // Mark as shutdown + shutdown = true; + onShutdownCallback.apply(error); + } + }); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java new file mode 100644 index 0000000..d72d7da --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Run one round of compaction + */ +public class Compactor implements Serializable { + + protected static volatile Logger log = LogManager.getLogger(Compactor.class); + + private transient HoodieWriteClient compactionClient; + private transient JavaSparkContext jssc; + + public Compactor(HoodieWriteClient compactionClient, JavaSparkContext jssc) { + this.jssc = jssc; + this.compactionClient = compactionClient; + } + + public void compact(HoodieInstant instant) throws IOException { + log.info("Compactor executing compaction " + instant); + JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp()); + long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count(); + if (numWriteErrors != 0) { + // We treat even a single error in compaction as fatal + log.error("Compaction for instant (" + instant + ") failed with write errors. " + + "Errors :" + numWriteErrors); + throw new HoodieException("Compaction for instant (" + instant + ") failed with write errors. " + + "Errors :" + numWriteErrors); + } + // Commit compaction + compactionClient.commitCompaction(instant.getTimestamp(), res, Optional.empty()); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java similarity index 52% copy from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java copy to hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java index 7e76ff5..c372b3c 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java @@ -21,17 +21,11 @@ package com.uber.hoodie.utilities.deltastreamer; import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; -import com.beust.jcommander.IStringConverter; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; import com.codahale.metrics.Timer; import com.uber.hoodie.AvroConversionUtils; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.KeyGenerator; -import com.uber.hoodie.OverwriteWithLatestAvroPayload; -import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; @@ -40,29 +34,29 @@ import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.hive.HiveSyncConfig; import com.uber.hoodie.hive.HiveSyncTool; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.utilities.HiveIncrementalPuller; import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider; import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.InputBatch; -import com.uber.hoodie.utilities.sources.JsonDFSSource; import com.uber.hoodie.utilities.transform.Transformer; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; @@ -78,19 +72,19 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.collection.JavaConversions; + /** - * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply - * it to the target dataset. Does not maintain any state, queries at runtime to see how far behind - * the target dataset is from the source dataset. This can be overriden to force sync from a - * timestamp. + * Sync's one batch of data to hoodie dataset */ -public class HoodieDeltaStreamer implements Serializable { - - private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); +public class DeltaSync implements Serializable { + protected static volatile Logger log = LogManager.getLogger(DeltaSync.class); public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; - private final Config cfg; + /** + * Delta Sync Config + */ + private final HoodieDeltaStreamer.Config cfg; /** * Source to pull deltas from @@ -98,8 +92,7 @@ public class HoodieDeltaStreamer implements Serializable { private transient SourceFormatAdapter formatAdapter; /** - * Schema provider that supplies the command for reading the input and writing out the target - * table. + * Schema provider that supplies the command for reading the input and writing out the target table. */ private transient SchemaProvider schemaProvider; @@ -119,11 +112,6 @@ public class HoodieDeltaStreamer implements Serializable { private transient FileSystem fs; /** - * Timeline with completed commits - */ - private transient Optional<HoodieTimeline> commitTimelineOpt; - - /** * Spark context */ private transient JavaSparkContext jssc; @@ -141,49 +129,114 @@ public class HoodieDeltaStreamer implements Serializable { /** * Bag of properties with source, hoodie client, key generator etc. */ - TypedProperties props; + private final TypedProperties props; - public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { - this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), - getDefaultHiveConf(jssc.hadoopConfiguration())); - } + /** + * Callback when write client is instantiated + */ + private transient Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient; + + /** + * Timeline with completed commits + */ + private transient Optional<HoodieTimeline> commitTimelineOpt; + + /** + * Write Client + */ + private transient HoodieWriteClient writeClient; + + /** + * Table Type + */ + private final HoodieTableType tableType; + + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, + SchemaProvider schemaProvider, HoodieTableType tableType, TypedProperties props, + JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) + throws IOException { - public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException { this.cfg = cfg; this.jssc = jssc; - this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); - this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); + this.sparkSession = sparkSession; + this.fs = fs; + this.tableType = tableType; + this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; + this.props = props; + log.info("Creating delta streamer with configs : " + props.toString()); + this.schemaProvider = schemaProvider; + + refreshTimeline(); + this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); + this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); + + this.formatAdapter = new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, + sparkSession, schemaProvider)); + + this.hiveConf = hiveConf; + if (cfg.filterDupes) { + cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; + } + + // If schemaRegistry already resolved, setup write-client + setupWriteClient(); + } + + /** + * Refresh Timeline + */ + private void refreshTimeline() throws IOException { if (fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), cfg.targetBasePath); + HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath); this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants()); } else { this.commitTimelineOpt = Optional.empty(); + HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, + cfg.storageType, cfg.targetTableName, "archived"); } + } - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - log.info("Creating delta streamer with configs : " + props.toString()); - this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); - this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); + /** + * Run one round of delta sync and return new compaction instant if one got scheduled + */ + public Optional<String> syncOnce() throws Exception { + Optional<String> scheduledCompaction = Optional.empty(); + HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider)); + Timer.Context overallTimerContext = metrics.getOverallTimerContext(); - this.formatAdapter = - new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, - schemaProvider)); + // Refresh Timeline + refreshTimeline(); - this.hiveConf = hiveConf; - } + Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = + readFromSource(commitTimelineOpt); + + if (null != srcRecordsWithCkpt) { + // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start + // compactor + if (null == schemaProvider) { + // Set the schemaProvider if not user-provided + this.schemaProvider = srcRecordsWithCkpt.getKey(); + // Setup HoodieWriteClient and compaction now that we decided on schema + setupWriteClient(); + } + + scheduledCompaction = writeToSink(srcRecordsWithCkpt.getRight().getRight(), + srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); + } - private static HiveConf getDefaultHiveConf(Configuration cfg) { - HiveConf hiveConf = new HiveConf(); - hiveConf.addResource(cfg); - return hiveConf; + // Clear persistent RDDs + jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); + return scheduledCompaction; } - public void sync() throws Exception { - HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(null)); - Timer.Context overallTimerContext = metrics.getOverallTimerContext(); + /** + * Read from Upstream Source and apply transformation if needed + */ + private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource( + Optional<HoodieTimeline> commitTimelineOpt) throws Exception { // Retrieve the previous round checkpoints, if any Optional<String> resumeCheckpointStr = Optional.empty(); if (commitTimelineOpt.isPresent()) { @@ -200,7 +253,7 @@ public class HoodieDeltaStreamer implements Serializable { } } } else { - HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), cfg.targetBasePath, + HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, cfg.storageType, cfg.targetTableName, "archived"); } log.info("Checkpoint to resume from : " + resumeCheckpointStr); @@ -218,11 +271,11 @@ public class HoodieDeltaStreamer implements Serializable { dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props)); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); avroRDDOptional = transformed.map(t -> - AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() + AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() ); // Use Transformed Row's schema if not overridden schemaProvider = - this.schemaProvider == null ? transformed.map(r -> (SchemaProvider)new RowBasedSchemaProvider(r.schema())) + this.schemaProvider == null ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())) .orElse(dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider; } else { // Pull the data from the source & prepare the write @@ -235,43 +288,54 @@ public class HoodieDeltaStreamer implements Serializable { if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { log.info("No new data, nothing to commit.. "); - return; + return null; } - registerAvroSchemas(schemaProvider); - JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); + (Comparable) gr.get(cfg.sourceOrderingField)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); + return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); + } + + /** + * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed + * + * @param records Input Records + * @param checkpointStr Checkpoint String + * @param metrics Metrics + * @return Optional Compaction instant if one is scheduled + */ + private Optional<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr, + HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception { + + Optional<String> scheduledCompactionInstant = Optional.empty(); // filter dupes if needed - HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider); if (cfg.filterDupes) { // turn upserts to insert cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg); + records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), + writeClient.getTimelineServer()); if (records.isEmpty()) { log.info("No new data, nothing to commit.. "); - return; + return Optional.empty(); } } - // Perform the write - HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true); - String commitTime = client.startCommit(); + String commitTime = startCommit(); log.info("Starting commit : " + commitTime); JavaRDD<WriteStatus> writeStatusRDD; if (cfg.operation == Operation.INSERT) { - writeStatusRDD = client.insert(records, commitTime); + writeStatusRDD = writeClient.insert(records, commitTime); } else if (cfg.operation == Operation.UPSERT) { - writeStatusRDD = client.upsert(records, commitTime); + writeStatusRDD = writeClient.upsert(records, commitTime); } else if (cfg.operation == Operation.BULK_INSERT) { - writeStatusRDD = client.bulkInsert(records, commitTime); + writeStatusRDD = writeClient.bulkInsert(records, commitTime); } else { throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); } @@ -289,19 +353,27 @@ public class HoodieDeltaStreamer implements Serializable { + totalErrorRecords + "/" + totalRecords); } - boolean success = client.commit(commitTime, writeStatusRDD, + boolean success = writeClient.commit(commitTime, writeStatusRDD, Optional.of(checkpointCommitMetadata)); if (success) { log.info("Commit " + commitTime + " successful!"); + + // Schedule compaction if needed + if (tableType.equals(HoodieTableType.MERGE_ON_READ) && cfg.continuousMode) { + scheduledCompactionInstant = writeClient + .scheduleCompaction(Optional.of(checkpointCommitMetadata)); + } + // Sync to hive if enabled Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); syncHive(); hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; } else { log.info("Commit " + commitTime + " failed!"); + throw new HoodieException("Commit " + commitTime + " failed!"); } } else { - log.error("There are errors when ingesting records. Errors/Total=" + log.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); log.error("Printing out the top 100 errors"); writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { @@ -311,15 +383,43 @@ public class HoodieDeltaStreamer implements Serializable { log.trace("Error for key:" + r.getKey() + " is " + r.getValue())); } }); + // Rolling back instant + writeClient.rollback(commitTime); + throw new HoodieException("Commit " + commitTime + " failed and rolled-back !"); } - client.close(); long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; // Send DeltaStreamer Metrics metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs); + + return scheduledCompactionInstant; } - public void syncHive() { + private String startCommit() { + final int maxRetries = 2; + int retryNum = 1; + RuntimeException lastException = null; + while (retryNum <= maxRetries) { + try { + return writeClient.startCommit(); + } catch (IllegalArgumentException ie) { + lastException = ie; + log.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie); + retryNum++; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + //No-Op + } + } + } + throw lastException; + } + + /** + * Sync to Hive + */ + private void syncHive() { if (cfg.enableHiveSync) { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName @@ -330,30 +430,38 @@ public class HoodieDeltaStreamer implements Serializable { } /** - * Register Avro Schemas - * @param schemaProvider Schema Provider + * Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created. + * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of + * this constraint. */ - private void registerAvroSchemas(SchemaProvider schemaProvider) { - // register the schemas, so that shuffle does not serialize the full schemas - if (null != schemaProvider) { - List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); - log.info("Registering Schema :" + schemas); - jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); + public void setupWriteClient() { + log.info("Setting up Hoodie Write Client"); + if ((null != schemaProvider) && (null == writeClient)) { + registerAvroSchemas(schemaProvider); + HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider); + writeClient = new HoodieWriteClient<>(jssc, hoodieCfg, true); + onInitializingHoodieWriteClient.apply(writeClient); } } + /** + * Helper to construct Write Client config + * + * @param schemaProvider Schema Provider + */ private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath) - .withAutoCommit(false).combineInput(cfg.filterDupes, true) + HoodieWriteConfig.newBuilder() + .withProps(props) + .withPath(cfg.targetBasePath) + .combineInput(cfg.filterDupes, true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(cfg.payloadClassName) - // turn on inline compaction by default, for MOR tables - .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ) - .build()) + // Inline compaction is disabled for continuous mode. otherwise enabled for MOR + .withInlineCompaction(!cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)).build()) .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withProps(props); + .withAutoCommit(false); if (null != schemaProvider) { builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); } @@ -361,151 +469,27 @@ public class HoodieDeltaStreamer implements Serializable { return builder.build(); } - public enum Operation { - UPSERT, INSERT, BULK_INSERT - } - - private static class OperationConvertor implements IStringConverter<Operation> { - @Override - public Operation convert(String value) throws ParameterException { - return Operation.valueOf(value); + /** + * Register Avro Schemas + * + * @param schemaProvider Schema Provider + */ + private void registerAvroSchemas(SchemaProvider schemaProvider) { + // register the schemas, so that shuffle does not serialize the full schemas + if (null != schemaProvider) { + List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); + log.info("Registering Schema :" + schemas); + jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } } - public static class Config implements Serializable { - - @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset. " - + "(Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)", - required = true) - public String targetBasePath; - - // TODO: How to obtain hive configs to register? - @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) - public String targetTableName; - - @Parameter(names = {"--storage-type"}, description = "Type of Storage. " - + "COPY_ON_WRITE (or) MERGE_ON_READ", required = true) - public String storageType; - - @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " - + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " - + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" - + "to individual classes, for supported properties.") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; - - @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " - + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter") - public List<String> configs = new ArrayList<>(); - - @Parameter(names = {"--source-class"}, description = "Subclass of com.uber.hoodie.utilities.sources to read data. " - + "Built-in options: com.uber.hoodie.utilities.sources.{JsonDFSSource (default), AvroDFSSource, " - + "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}") - public String sourceClassName = JsonDFSSource.class.getName(); - - @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how" - + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") - public String sourceOrderingField = "ts"; - - @Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.KeyGenerator " - + "to generate a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (uses " - + "provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)") - public String keyGeneratorClass = SimpleKeyGenerator.class.getName(); - - @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off " - + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") - public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); - - @Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema" - + ".SchemaProvider to attach schemas to input & target table data, built in options: " - + "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider." - + "Source (See com.uber.hoodie.utilities.sources.Source) implementation can implement their own SchemaProvider." - + " For Sources that return Dataset<Row>, the schema is obtained implicitly. " - + "However, this CLI option allows overriding the schemaprovider returned by Source.") - public String schemaProviderClassName = null; - - @Parameter(names = {"--transformer-class"}, - description = "subclass of com.uber.hoodie.utilities.transform.Transformer" - + ". Allows transforming raw source dataset to a target dataset (conforming to target schema) before writing." - + " Default : Not set. E:g - com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which allows" - + "a SQL query templated to be passed as a transformation function)") - public String transformerClassName = null; - - @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. " - + "Default: No limit For e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read") - public long sourceLimit = Long.MAX_VALUE; - - @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", - converter = OperationConvertor.class) - public Operation operation = Operation.UPSERT; - - @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out" - + "before insert/bulk-insert") - public Boolean filterDupes = false; - - @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") - public Boolean enableHiveSync = false; - - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; - - @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") - public Boolean commitOnErrors = false; - - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - } - - public static void main(String[] args) throws Exception { - final Config cfg = new Config(); - JCommander cmd = new JCommander(cfg, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); + /** + * Close all resources + */ + public void close() { + if (null != writeClient) { + writeClient.close(); + writeClient = null; } - - JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster); - new HoodieDeltaStreamer(cfg, jssc).sync(); - } - - public SourceFormatAdapter getFormatAdapter() { - return formatAdapter; - } - - public SchemaProvider getSchemaProvider() { - return schemaProvider; - } - - public Transformer getTransformer() { - return transformer; - } - - public KeyGenerator getKeyGenerator() { - return keyGenerator; - } - - public FileSystem getFs() { - return fs; - } - - public Optional<HoodieTimeline> getCommitTimelineOpt() { - return commitTimelineOpt; - } - - public JavaSparkContext getJssc() { - return jssc; - } - - public SparkSession getSparkSession() { - return sparkSession; - } - - public HiveConf getHiveConf() { - return hiveConf; - } - - public TypedProperties getProps() { - return props; } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 7e76ff5..3342bff 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -18,71 +18,68 @@ package com.uber.hoodie.utilities.deltastreamer; -import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; -import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; -import com.codahale.metrics.Timer; -import com.uber.hoodie.AvroConversionUtils; -import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.HoodieWriteClient; -import com.uber.hoodie.KeyGenerator; import com.uber.hoodie.OverwriteWithLatestAvroPayload; import com.uber.hoodie.SimpleKeyGenerator; -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieCommitMetadata; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.TypedProperties; -import com.uber.hoodie.config.HoodieCompactionConfig; -import com.uber.hoodie.config.HoodieIndexConfig; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.hive.HiveSyncConfig; -import com.uber.hoodie.hive.HiveSyncTool; -import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.utilities.HiveIncrementalPuller; import com.uber.hoodie.utilities.UtilHelpers; -import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; -import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider; import com.uber.hoodie.utilities.schema.SchemaProvider; -import com.uber.hoodie.utilities.sources.InputBatch; import com.uber.hoodie.utilities.sources.JsonDFSSource; -import com.uber.hoodie.utilities.transform.Transformer; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import scala.collection.JavaConversions; + /** - * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply - * it to the target dataset. Does not maintain any state, queries at runtime to see how far behind - * the target dataset is from the source dataset. This can be overriden to force sync from a - * timestamp. + * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target + * dataset. Does not maintain any state, queries at runtime to see how far behind the target dataset is from the source + * dataset. This can be overriden to force sync from a timestamp. + * + * In continuous mode, DeltaStreamer runs in loop-mode going through the below operations + * (a) pull-from-source + * (b) write-to-sink + * (c) Schedule Compactions if needed + * (d) Conditionally Sync to Hive + * each cycle. For MOR table with continuous mode enabled, a seperate compactor thread is allocated to execute + * compactions */ public class HoodieDeltaStreamer implements Serializable { @@ -90,58 +87,9 @@ public class HoodieDeltaStreamer implements Serializable { public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; - private final Config cfg; - - /** - * Source to pull deltas from - */ - private transient SourceFormatAdapter formatAdapter; - - /** - * Schema provider that supplies the command for reading the input and writing out the target - * table. - */ - private transient SchemaProvider schemaProvider; - - /** - * Allows transforming source to target dataset before writing - */ - private transient Transformer transformer; - - /** - * Extract the key for the target dataset - */ - private KeyGenerator keyGenerator; - - /** - * Filesystem used - */ - private transient FileSystem fs; - - /** - * Timeline with completed commits - */ - private transient Optional<HoodieTimeline> commitTimelineOpt; + private final transient Config cfg; - /** - * Spark context - */ - private transient JavaSparkContext jssc; - - /** - * Spark Session - */ - private transient SparkSession sparkSession; - - /** - * Hive Config - */ - private transient HiveConf hiveConf; - - /** - * Bag of properties with source, hoodie client, key generator etc. - */ - TypedProperties props; + private transient DeltaSyncService deltaSyncService; public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), @@ -150,29 +98,11 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) throws IOException { this.cfg = cfg; - this.jssc = jssc; - this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); - this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); - - if (fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), cfg.targetBasePath); - this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants()); - } else { - this.commitTimelineOpt = Optional.empty(); - } - - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - log.info("Creating delta streamer with configs : " + props.toString()); - this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); - this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); - - this.formatAdapter = - new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, - schemaProvider)); + this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf); + } - this.hiveConf = hiveConf; + public void shutdownGracefully() { + deltaSyncService.shutdown(false); } private static HiveConf getDefaultHiveConf(Configuration cfg) { @@ -181,184 +111,27 @@ public class HoodieDeltaStreamer implements Serializable { return hiveConf; } - public void sync() throws Exception { - HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(null)); - Timer.Context overallTimerContext = metrics.getOverallTimerContext(); - // Retrieve the previous round checkpoints, if any - Optional<String> resumeCheckpointStr = Optional.empty(); - if (commitTimelineOpt.isPresent()) { - Optional<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant(); - if (lastCommit.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); - if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { - resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); - } else { - throw new HoodieDeltaStreamerException( - "Unable to find previous checkpoint. Please double check if this table " - + "was indeed built via delta streamer "); - } - } - } else { - HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), cfg.targetBasePath, - cfg.storageType, cfg.targetTableName, "archived"); - } - log.info("Checkpoint to resume from : " + resumeCheckpointStr); - - final Optional<JavaRDD<GenericRecord>> avroRDDOptional; - final String checkpointStr; - final SchemaProvider schemaProvider; - if (transformer != null) { - // Transformation is needed. Fetch New rows in Row Format, apply transformation and then convert them - // to generic records for writing - InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat( - resumeCheckpointStr, cfg.sourceLimit); - - Optional<Dataset<Row>> transformed = - dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props)); - checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - avroRDDOptional = transformed.map(t -> - AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() - ); - // Use Transformed Row's schema if not overridden - schemaProvider = - this.schemaProvider == null ? transformed.map(r -> (SchemaProvider)new RowBasedSchemaProvider(r.schema())) - .orElse(dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider; - } else { - // Pull the data from the source & prepare the write - InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = - formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit); - avroRDDOptional = dataAndCheckpoint.getBatch(); - checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - schemaProvider = dataAndCheckpoint.getSchemaProvider(); - } - - if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { - log.info("No new data, nothing to commit.. "); - return; - } - - registerAvroSchemas(schemaProvider); - - JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); - JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { - HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); - return new HoodieRecord<>(keyGenerator.getKey(gr), payload); - }); - - // filter dupes if needed - HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider); - if (cfg.filterDupes) { - // turn upserts to insert - cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg); - - if (records.isEmpty()) { - log.info("No new data, nothing to commit.. "); - return; - } - } - - // Perform the write - HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true); - String commitTime = client.startCommit(); - log.info("Starting commit : " + commitTime); - - JavaRDD<WriteStatus> writeStatusRDD; - if (cfg.operation == Operation.INSERT) { - writeStatusRDD = client.insert(records, commitTime); - } else if (cfg.operation == Operation.UPSERT) { - writeStatusRDD = client.upsert(records, commitTime); - } else if (cfg.operation == Operation.BULK_INSERT) { - writeStatusRDD = client.bulkInsert(records, commitTime); - } else { - throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); - } - - long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue(); - long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue(); - boolean hasErrors = totalErrorRecords > 0; - long hiveSyncTimeMs = 0; - if (!hasErrors || cfg.commitOnErrors) { - HashMap<String, String> checkpointCommitMetadata = new HashMap<>(); - checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); - - if (hasErrors) { - log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" - + totalErrorRecords + "/" + totalRecords); - } - - boolean success = client.commit(commitTime, writeStatusRDD, - Optional.of(checkpointCommitMetadata)); - if (success) { - log.info("Commit " + commitTime + " successful!"); - // Sync to hive if enabled - Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); - syncHive(); - hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; - } else { - log.info("Commit " + commitTime + " failed!"); - } - } else { - log.error("There are errors when ingesting records. Errors/Total=" - + totalErrorRecords + "/" + totalRecords); - log.error("Printing out the top 100 errors"); - writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { - log.error("Global error :", ws.getGlobalError()); - if (ws.getErrors().size() > 0) { - ws.getErrors().entrySet().forEach(r -> - log.trace("Error for key:" + r.getKey() + " is " + r.getValue())); - } - }); - } - client.close(); - long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; - - // Send DeltaStreamer Metrics - metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs); - } - - public void syncHive() { - if (cfg.enableHiveSync) { - HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); - log.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName - + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); - - new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable(); - } - } - /** - * Register Avro Schemas - * @param schemaProvider Schema Provider + * Main method to start syncing + * @throws Exception */ - private void registerAvroSchemas(SchemaProvider schemaProvider) { - // register the schemas, so that shuffle does not serialize the full schemas - if (null != schemaProvider) { - List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); - log.info("Registering Schema :" + schemas); - jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); + public void sync() throws Exception { + if (cfg.continuousMode) { + deltaSyncService.start(this::onDeltaSyncShutdown); + deltaSyncService.waitForShutdown(); + log.info("Delta Sync shutting down"); + } else { + log.info("Delta Streamer running only single round"); + deltaSyncService.getDeltaSync().syncOnce(); + deltaSyncService.close(); + log.info("Shut down deltastreamer"); } } - private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { - HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath) - .withAutoCommit(false).combineInput(cfg.filterDupes, true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withPayloadClass(cfg.payloadClassName) - // turn on inline compaction by default, for MOR tables - .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) == HoodieTableType.MERGE_ON_READ) - .build()) - .forTable(cfg.targetTableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withProps(props); - if (null != schemaProvider) { - builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); - } - - return builder.build(); + private boolean onDeltaSyncShutdown(boolean error) { + log.info("DeltaSync shutdown. Closing write client. Error?" + error); + deltaSyncService.close(); + return true; } public enum Operation { @@ -366,6 +139,7 @@ public class HoodieDeltaStreamer implements Serializable { } private static class OperationConvertor implements IStringConverter<Operation> { + @Override public Operation convert(String value) throws ParameterException { return Operation.valueOf(value); @@ -426,9 +200,9 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--transformer-class"}, description = "subclass of com.uber.hoodie.utilities.transform.Transformer" - + ". Allows transforming raw source dataset to a target dataset (conforming to target schema) before writing." - + " Default : Not set. E:g - com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which allows" - + "a SQL query templated to be passed as a transformation function)") + + ". Allows transforming raw source dataset to a target dataset (conforming to target schema) before " + + "writing. Default : Not set. E:g - com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which " + + "allows a SQL query templated to be passed as a transformation function)") public String transformerClassName = null; @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. " @@ -447,16 +221,57 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; + @Parameter(names = {"--max-pending-compactions"}, + description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" + + "outstanding compactions is less than this number") + public Integer maxPendingCompactions = 5; + + @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in continuous mode running" + + " source-fetch -> Transform -> Hudi Write in loop") + public Boolean continuousMode = false; + @Parameter(names = {"--spark-master"}, description = "spark master to use.") public String sparkMaster = "local[2]"; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false; + @Parameter(names = {"--delta-sync-scheduling-weight"}, description = + "Scheduling weight for delta sync as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer deltaSyncSchedulingWeight = 1; + + @Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer compactSchedulingWeight = 1; + + @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer deltaSyncSchedulingMinShare = 0; + + @Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer compactSchedulingMinShare = 0; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } + /** + * Helper to set Spark Scheduling Configs dynamically + * + * @param cfg Config + */ + public static Map<String, String> getSparkSchedulingConfigs(Config cfg) throws Exception { + Map<String, String> additionalSparkConfigs = new HashMap<>(); + if (cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { + String sparkSchedulingConfFile = SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, + cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); + additionalSparkConfigs.put("spark.scheduler.allocation.file", sparkSchedulingConfFile); + } + return additionalSparkConfigs; + } + public static void main(String[] args) throws Exception { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, args); @@ -465,47 +280,288 @@ public class HoodieDeltaStreamer implements Serializable { System.exit(1); } - JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster); + Map<String, String> additionalSparkConfigs = getSparkSchedulingConfigs(cfg); + JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, + cfg.sparkMaster, additionalSparkConfigs); + if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode"))) + && cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { + log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " + + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); + } new HoodieDeltaStreamer(cfg, jssc).sync(); } - public SourceFormatAdapter getFormatAdapter() { - return formatAdapter; - } - public SchemaProvider getSchemaProvider() { - return schemaProvider; - } + /** + * Syncs data either in single-run or in continuous mode. + */ + public static class DeltaSyncService extends AbstractDeltaStreamerService { + + /** + * Delta Sync Config + */ + private final HoodieDeltaStreamer.Config cfg; + + /** + * Schema provider that supplies the command for reading the input and writing out the target table. + */ + private transient SchemaProvider schemaProvider; + + /** + * Spark Session + */ + private transient SparkSession sparkSession; + + /** + * Spark context + */ + private transient JavaSparkContext jssc; + + /** + * Bag of properties with source, hoodie client, key generator etc. + */ + TypedProperties props; + + /** + * Async Compactor Service + */ + private AsyncCompactService asyncCompactService; + + /** + * Table Type + */ + private final HoodieTableType tableType; + + /** + * Delta Sync + */ + private transient DeltaSync deltaSync; + + public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) + throws IOException { + this.cfg = cfg; + this.jssc = jssc; + this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); + + if (fs.exists(new Path(cfg.targetBasePath))) { + HoodieTableMetaClient meta = new HoodieTableMetaClient( + new Configuration(fs.getConf()), cfg.targetBasePath, false); + tableType = meta.getTableType(); + } else { + tableType = HoodieTableType.valueOf(cfg.storageType); + } - public Transformer getTransformer() { - return transformer; - } + this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + log.info("Creating delta streamer with configs : " + props.toString()); + this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); - public KeyGenerator getKeyGenerator() { - return keyGenerator; - } + if (cfg.filterDupes) { + cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; + } - public FileSystem getFs() { - return fs; - } + deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, + props, jssc, fs, hiveConf, this::onInitializingWriteClient); + } - public Optional<HoodieTimeline> getCommitTimelineOpt() { - return commitTimelineOpt; - } + public DeltaSync getDeltaSync() { + return deltaSync; + } - public JavaSparkContext getJssc() { - return jssc; - } + @Override + protected Pair<CompletableFuture, ExecutorService> startService() { + ExecutorService executor = Executors.newFixedThreadPool(1); + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + if (cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)) { + // set Scheduler Pool. + log.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME); + } + try { + while (!isShutdownRequested()) { + try { + Optional<String> scheduledCompactionInstant = deltaSync.syncOnce(); + if (scheduledCompactionInstant.isPresent()) { + log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")"); + asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, + scheduledCompactionInstant.get())); + asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + } + } catch (Exception e) { + log.error("Shutting down delta-sync due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownCompactor(error); + } + return true; + }, executor), executor); + } + + /** + * Shutdown compactor as DeltaSync is shutdown + */ + private void shutdownCompactor(boolean error) { + log.info("Delta Sync shutdown. Error ?" + error); + if (asyncCompactService != null) { + log.warn("Gracefully shutting down compactor"); + asyncCompactService.shutdown(false); + } + } + + /** + * Callback to initialize write client and start compaction service if required + * @param writeClient HoodieWriteClient + * @return + */ + protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + asyncCompactService = new AsyncCompactService(jssc, writeClient); + // Enqueue existing pending compactions first + HoodieTableMetaClient meta = new HoodieTableMetaClient( + new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); + List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta); + pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); + asyncCompactService.start((error) -> { + // Shutdown DeltaSync + shutdown(false); + return true; + }); + try { + asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + } catch (InterruptedException ie) { + throw new HoodieException(ie); + } + } + return true; + } + + /** + * Close all resources + */ + public void close() { + if (null != deltaSync) { + deltaSync.close(); + } + } + + public SchemaProvider getSchemaProvider() { + return schemaProvider; + } + + public SparkSession getSparkSession() { + return sparkSession; + } + + public JavaSparkContext getJavaSparkContext() { + return jssc; + } + + public AsyncCompactService getAsyncCompactService() { + return asyncCompactService; + } - public SparkSession getSparkSession() { - return sparkSession; + public TypedProperties getProps() { + return props; + } } - public HiveConf getHiveConf() { - return hiveConf; + /** + * Async Compactor Service tha runs in separate thread. Currently, only one compactor is allowed to run at any time. + */ + public static class AsyncCompactService extends AbstractDeltaStreamerService { + + private final int maxConcurrentCompaction; + private transient Compactor compactor; + private transient JavaSparkContext jssc; + private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>(); + private transient ReentrantLock queueLock = new ReentrantLock(); + private transient Condition consumed = queueLock.newCondition(); + + public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) { + this.jssc = jssc; + this.compactor = new Compactor(client, jssc); + //TODO: HUDI-157 : Only allow 1 compactor to run in parallel till Incremental View on MOR is fully implemented. + this.maxConcurrentCompaction = 1; + } + + /** + * Enqueues new Pending compaction + */ + public void enqueuePendingCompaction(HoodieInstant instant) { + pendingCompactions.add(instant); + } + + /** + * Wait till outstanding pending compactions reduces to the passed in value + * @param numPendingCompactions Maximum pending compactions allowed + * @throws InterruptedException + */ + public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException { + try { + queueLock.lock(); + while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) { + consumed.await(); + } + } finally { + queueLock.unlock(); + } + } + + /** + * Fetch Next pending compaction if available + * @return + * @throws InterruptedException + */ + private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { + log.info("Compactor waiting for next instant for compaction upto 60 seconds"); + HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS); + if (instant != null) { + try { + queueLock.lock(); + // Signal waiting thread + consumed.signal(); + } finally { + queueLock.unlock(); + } + } + return instant; + } + + /** + * Start Compaction Service + */ + protected Pair<CompletableFuture, ExecutorService> startService() { + ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction); + List<CompletableFuture<Boolean>> compactionFutures = + IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + // Set Compactor Pool Name for allowing users to prioritize compaction + log.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); + + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextCompactionInstant(); + if (null != instant) { + compactor.compact(instant); + } + } + log.info("Compactor shutting down properly!!"); + } catch (InterruptedException ie) { + log.warn("Compactor executor thread got interrupted exception. Stopping", ie); + } catch (IOException e) { + log.error("Compactor executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).collect(Collectors.toList()); + return Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)), executor); + } } - public TypedProperties getProps() { - return props; + public DeltaSyncService getDeltaSyncService() { + return deltaSyncService; } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java new file mode 100644 index 0000000..fdc35e4 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.lang.text.StrSubstitutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Utility Class to generate Spark Scheduling allocation file. This kicks in only when user + * sets spark.scheduler.mode=FAIR at spark-submit time + */ +public class SchedulerConfGenerator { + + protected static volatile Logger log = LogManager.getLogger(SchedulerConfGenerator.class); + + public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; + public static final String COMPACT_POOL_NAME = "hoodiecompact"; + + + private static final String DELTASYNC_POOL_KEY = "deltasync_pool"; + private static final String COMPACT_POOL_KEY = "compact_pool"; + private static final String DELTASYNC_POLICY_KEY = "deltasync_policy"; + private static final String COMPACT_POLICY_KEY = "compact_policy"; + private static final String DELTASYNC_WEIGHT_KEY = "deltasync_weight"; + private static final String DELTASYNC_MINSHARE_KEY = "deltasync_minshare"; + private static final String COMPACT_WEIGHT_KEY = "compact_weight"; + private static final String COMPACT_MINSHARE_KEY = "compact_minshare"; + + private static String SPARK_SCHEDULING_PATTERN = + "<?xml version=\"1.0\"?>\n" + + "<allocations>\n" + + " <pool name=\"%(deltasync_pool)\">\n" + + " <schedulingMode>%(deltasync_policy)</schedulingMode>\n" + + " <weight>%(deltasync_weight)</weight>\n" + + " <minShare>%(deltasync_minshare)</minShare>\n" + + " </pool>\n" + + " <pool name=\"%(compact_pool)\">\n" + + " <schedulingMode>%(compact_policy)</schedulingMode>\n" + + " <weight>%(compact_weight)</weight>\n" + + " <minShare>%(compact_minshare)</minShare>\n" + + " </pool>\n" + + "</allocations>"; + + private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, + Integer compactionMinShare) { + Map<String, String> schedulingProps = new HashMap<>(); + schedulingProps.put(DELTASYNC_POOL_KEY, DELTASYNC_POOL_NAME); + schedulingProps.put(COMPACT_POOL_KEY, COMPACT_POOL_NAME); + schedulingProps.put(DELTASYNC_POLICY_KEY, "FAIR"); + schedulingProps.put(COMPACT_POLICY_KEY, "FAIR"); + schedulingProps.put(DELTASYNC_WEIGHT_KEY, deltaSyncWeight.toString()); + schedulingProps.put(DELTASYNC_MINSHARE_KEY, deltaSyncMinShare.toString()); + schedulingProps.put(COMPACT_WEIGHT_KEY, compactionWeight.toString()); + schedulingProps.put(COMPACT_MINSHARE_KEY, compactionMinShare.toString()); + + StrSubstitutor sub = new StrSubstitutor(schedulingProps, "%(", ")"); + String xmlString = sub.replace(SPARK_SCHEDULING_PATTERN); + log.info("Scheduling Configurations generated. Config=\n" + xmlString); + return xmlString; + } + + public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, + Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException { + File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml"); + BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); + bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare)); + bw.close(); + log.info("Configs written to file" + tempConfigFile.getAbsolutePath()); + return tempConfigFile.getAbsolutePath(); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java index 14a7be5..68a5c8d 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java @@ -24,11 +24,13 @@ import static org.junit.Assert.fail; import com.uber.hoodie.DataSourceWriteOptions; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.DFSPropertiesConfiguration; import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.hive.HiveSyncConfig; import com.uber.hoodie.hive.HoodieHiveClient; @@ -36,17 +38,28 @@ import com.uber.hoodie.hive.MultiPartKeysValueExtractor; import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer; import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; +import com.uber.hoodie.utilities.sources.DistributedTestDataSource; import com.uber.hoodie.utilities.sources.HoodieIncrSource; +import com.uber.hoodie.utilities.sources.InputBatch; import com.uber.hoodie.utilities.sources.TestDataSource; +import com.uber.hoodie.utilities.sources.config.TestSourceConfig; import com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer; import com.uber.hoodie.utilities.transform.Transformer; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -57,6 +70,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -197,6 +211,22 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(expected, recordCount); } + static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) { + HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numCompactionCommits = (int)timeline.getInstants().count(); + assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits); + } + + static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) { + HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); + HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); + log.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int)timeline.getInstants().count(); + assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits); + } + static String assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits) throws IOException { HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); @@ -208,6 +238,23 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY)); return lastInstant.getTimestamp(); } + + static void waitTillCondition(Function<Boolean, Boolean> condition, long timeoutInSecs) throws Exception { + Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> { + boolean ret = false; + while (!ret) { + try { + Thread.sleep(3000); + ret = condition.apply(true); + } catch (Throwable error) { + log.warn("Got error :", error); + ret = false; + } + } + return true; + }); + res.get(timeoutInSecs, TimeUnit.SECONDS); + } } @Test @@ -261,6 +308,51 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(2000, counts.get(0).getLong(1)); } + @Test + public void testUpsertsCOWContinuousMode() throws Exception { + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); + } + + @Test + public void testUpsertsMORContinuousMode() throws Exception { + testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); + } + + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { + String datasetBasePath = dfsBasePath + "/" + tempDir; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.UPSERT); + cfg.continuousMode = true; + cfg.storageType = tableType.name(); + cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { + try { + ds.sync(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + }); + + TestHelpers.waitTillCondition((r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHelpers.assertAtleastNDeltaCommits(5, datasetBasePath, dfs); + TestHelpers.assertAtleastNCompactionCommits(2, datasetBasePath, dfs); + } else { + TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs); + } + TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); + return true; + }, 180); + ds.shutdownGracefully(); + dsFuture.get(); + } + /** * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline * The first step involves using a SQL template to transform a source @@ -366,6 +458,20 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(1000, counts.get(1).getLong(1)); } + @Test + public void testDistributedTestDataSource() throws Exception { + TypedProperties props = new TypedProperties(); + props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000"); + props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1"); + props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true"); + DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, + jsc, sparkSession, null); + InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Optional.empty(), 10000000); + batch.getBatch().get().cache(); + long c = batch.getBatch().get().count(); + Assert.assertEquals(1000, c); + } + /** * UDF to calculate Haversine distance */ diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java new file mode 100644 index 0000000..22b1324 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java @@ -0,0 +1,103 @@ +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.RocksDBBasedMap; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.config.TestSourceConfig; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +public abstract class AbstractBaseTestSource extends AvroSource { + + // Static instance, helps with reuse across a test. + protected static transient HoodieTestDataGenerator dataGenerator; + + public static void initDataGen() { + dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); + } + + public static void initDataGen(TypedProperties props) { + try { + boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, + TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); + String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null); + if (null == baseStoreDir) { + baseStoreDir = File.createTempFile("test_data_gen", ".keys").getParent(); + } + log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir); + dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, + useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + public static void resetDataGen() { + if (null != dataGenerator) { + dataGenerator.close(); + } + dataGenerator = null; + } + + protected AbstractBaseTestSource(TypedProperties props, + JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + } + + protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime) { + int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, + TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); + + // generate `sourceLimit` number of upserts each time. + int numExistingKeys = dataGenerator.getNumExistingKeys(); + log.info("NumExistingKeys=" + numExistingKeys); + + int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); + int numInserts = sourceLimit - numUpdates; + log.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates); + + if (numInserts + numExistingKeys > maxUniqueKeys) { + // Limit inserts so that maxUniqueRecords is maintained + numInserts = Math.max(0, maxUniqueKeys - numExistingKeys); + } + + if ((numInserts + numUpdates) < sourceLimit) { + // try to expand updates to safe limit + numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts); + } + + log.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); + long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() + + ", Free Memory=" + Runtime.getRuntime().freeMemory()); + + List<GenericRecord> records = new ArrayList<>(); + Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) + .map(AbstractBaseTestSource::toGenericRecord); + Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) + .map(AbstractBaseTestSource::toGenericRecord); + return Stream.concat(updateStream, insertStream); + } + + private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { + try { + Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); + return (GenericRecord) recordOpt.get(); + } catch (IOException e) { + return null; + } + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java new file mode 100644 index 0000000..533e25e --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.config.TestSourceConfig; +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +/** + * A Test DataSource which scales test-data generation by using spark parallelism. + */ +public class DistributedTestDataSource extends AbstractBaseTestSource { + + private final int numTestSourcePartitions; + + public DistributedTestDataSource(TypedProperties props, + JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + this.numTestSourcePartitions = props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, + TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS); + } + + @Override + protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCkptStr, long sourceLimit) { + int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0); + String commitTime = String.format("%05d", nextCommitNum); + log.info("Source Limit is set to " + sourceLimit); + + // No new data. + if (sourceLimit <= 0) { + return new InputBatch<>(Optional.empty(), commitTime); + } + + TypedProperties newProps = new TypedProperties(); + newProps.putAll(props); + + // Set the maxUniqueRecords per partition for TestDataSource + int maxUniqueRecords = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, + TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); + String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, maxUniqueRecords / numTestSourcePartitions)); + newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition); + int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions)); + JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed() + .collect(Collectors.toList()), numTestSourcePartitions).mapPartitions(idx -> { + log.info("Initializing source with newProps=" + newProps); + if (null == dataGenerator) { + initDataGen(newProps); + } + Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime).iterator(); + return itr; + }); + return new InputBatch<>(Optional.of(avroRDD), commitTime); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java index bc46472..e4bd4ff 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java @@ -18,17 +18,12 @@ package com.uber.hoodie.utilities.sources; -import com.uber.hoodie.common.HoodieTestDataGenerator; -import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.utilities.schema.SchemaProvider; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -38,32 +33,15 @@ import org.apache.spark.sql.SparkSession; /** * An implementation of {@link Source}, that emits test upserts. */ -public class TestDataSource extends AvroSource { +public class TestDataSource extends AbstractBaseTestSource { private static volatile Logger log = LogManager.getLogger(TestDataSource.class); - // Static instance, helps with reuse across a test. - private static HoodieTestDataGenerator dataGenerator; - - public static void initDataGen() { - dataGenerator = new HoodieTestDataGenerator(); - } - - public static void resetDataGen() { - dataGenerator = null; - } - public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - } - - private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { - try { - Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); - return (GenericRecord) recordOpt.get(); - } catch (IOException e) { - return null; + if (null == dataGenerator) { + initDataGen(props); } } @@ -73,26 +51,14 @@ public class TestDataSource extends AvroSource { int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0); String commitTime = String.format("%05d", nextCommitNum); + log.info("Source Limit is set to " + sourceLimit); + // No new data. if (sourceLimit <= 0) { return new InputBatch<>(Optional.empty(), commitTime); } - // generate `sourceLimit` number of upserts each time. - int numExistingKeys = dataGenerator.getExistingKeysList().size(); - int numUpdates = Math.min(numExistingKeys, (int) sourceLimit / 2); - int numInserts = (int) sourceLimit - numUpdates; - - List<GenericRecord> records = new ArrayList<>(); - try { - records.addAll(dataGenerator.generateUniqueUpdates(commitTime, numUpdates).stream() - .map(this::toGenericRecord).collect(Collectors.toList())); - records.addAll(dataGenerator.generateInserts(commitTime, numInserts).stream() - .map(this::toGenericRecord).collect(Collectors.toList())); - } catch (IOException e) { - log.error("Error generating test data.", e); - } - + List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime).collect(Collectors.toList()); JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4); return new InputBatch<>(Optional.of(avroRDD), commitTime); } diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java new file mode 100644 index 0000000..0f63221 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources.config; + +/** + * Configurations for Test Data Sources + */ +public class TestSourceConfig { + + // Used by DistributedTestDataSource only. Number of partitions where each partitions generates test-data + public static final String NUM_SOURCE_PARTITIONS_PROP = "hoodie.deltastreamer.source.test.num_partitions"; + public static final Integer DEFAULT_NUM_SOURCE_PARTITIONS = 10; + + // Maximum number of unique records generated for the run + public static final String MAX_UNIQUE_RECORDS_PROP = "hoodie.deltastreamer.source.test.max_unique_records"; + public static final Integer DEFAULT_MAX_UNIQUE_RECORDS = Integer.MAX_VALUE; + + // Use Rocks DB for storing datagen keys + public static final String USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = + "hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys"; + public static final Boolean DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = false; + + // Base Dir for storing datagen keys + public static final String ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS = + "hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir"; + +} diff --git a/packaging/hoodie-utilities-bundle/pom.xml b/packaging/hoodie-utilities-bundle/pom.xml index a3842ab..76d478d 100644 --- a/packaging/hoodie-utilities-bundle/pom.xml +++ b/packaging/hoodie-utilities-bundle/pom.xml @@ -66,6 +66,7 @@ <includes> <include>commons-codec:commons-codec</include> <include>commons-dbcp:commons-dbcp</include> + <include>commons-lang:commons-lang</include> <include>commons-pool:commons-pool</include> <include>com.uber.hoodie:hoodie-common</include> <include>com.uber.hoodie:hoodie-client</include> @@ -110,6 +111,10 @@ <shadedPattern>com.uber.hoodie.org.apache.commons.dbcp.</shadedPattern> </relocation> <relocation> + <pattern>org.apache.commons.lang.</pattern> + <shadedPattern>com.uber.hoodie.org.apache.commons.lang.</shadedPattern> + </relocation> + <relocation> <pattern>org.apache.commons.pool.</pattern> <shadedPattern>com.uber.hoodie.org.apache.commons.pool.</shadedPattern> </relocation> diff --git a/pom.xml b/pom.xml index 44e9dd8..59437ec 100644 --- a/pom.xml +++ b/pom.xml @@ -575,6 +575,11 @@ <version>1.4</version> </dependency> <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + </dependency> + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version>