This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new f93e64f [HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388) f93e64f is described below commit f93e64fee413ed1b774156e688794ee7937cc01a Author: hongdd <jn_...@163.com> AuthorDate: Mon Mar 9 18:31:04 2020 +0800 [HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388) * [HUDI-681]Remove embeddedTimelineService from HoodieReadClient --- .../org/apache/hudi/client/HoodieReadClient.java | 21 ++------------------- .../main/java/org/apache/hudi/DataSourceUtils.java | 10 ++++------ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +---- .../hudi/utilities/deltastreamer/DeltaSync.java | 2 +- 4 files changed, 8 insertions(+), 30 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 33d661b..d1e92b5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -72,18 +71,10 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ /** * @param basePath path to Hoodie table */ - public HoodieReadClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineService) { + public HoodieReadClient(JavaSparkContext jsc, String basePath) { this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath) // by default we use HoodieBloomIndex - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(), - timelineService); - } - - /** - * @param basePath path to Hoodie table - */ - public HoodieReadClient(JavaSparkContext jsc, String basePath) { - this(jsc, basePath, Option.empty()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build()); } /** @@ -100,14 +91,6 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ * @param clientConfig instance of HoodieWriteConfig */ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { - this(jsc, clientConfig, Option.empty()); - } - - /** - * @param clientConfig instance of HoodieWriteConfig - */ - public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, - Option<EmbeddedTimelineService> timelineService) { this.jsc = jsc; final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 6a4ad03..99a795d 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -23,11 +23,9 @@ import org.apache.avro.Schema; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; @@ -222,9 +220,9 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, - HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) { + HoodieWriteConfig writeConfig) { try { - HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService); + HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { @@ -236,10 +234,10 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, - Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) { + Map<String, String> parameters) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); - return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService); + return dropDuplicates(jssc, incomingHoodieRecords, writeConfig); } public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 80a01d3..326595f 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter { val hoodieRecords = if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates( - jsc, - hoodieAllIncomingRecords, - mapAsJavaMap(parameters), client.getTimelineServer) + DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) } else { hoodieAllIncomingRecords } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 97d3d42..4b69d22 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -357,7 +357,7 @@ public class DeltaSync implements Serializable { if (cfg.filterDupes) { // turn upserts to insert cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; - records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer()); + records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } boolean isEmpty = records.isEmpty();