This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f93e64f  [HUDI-681]Remove embeddedTimelineService from 
HoodieReadClient (#1388)
f93e64f is described below

commit f93e64fee413ed1b774156e688794ee7937cc01a
Author: hongdd <jn_...@163.com>
AuthorDate: Mon Mar 9 18:31:04 2020 +0800

    [HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388)
    
    * [HUDI-681]Remove embeddedTimelineService from HoodieReadClient
---
 .../org/apache/hudi/client/HoodieReadClient.java    | 21 ++-------------------
 .../main/java/org/apache/hudi/DataSourceUtils.java  | 10 ++++------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala      |  5 +----
 .../hudi/utilities/deltastreamer/DeltaSync.java     |  2 +-
 4 files changed, 8 insertions(+), 30 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 33d661b..d1e92b5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -72,18 +71,10 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> implements Serializ
   /**
    * @param basePath path to Hoodie table
    */
-  public HoodieReadClient(JavaSparkContext jsc, String basePath, 
Option<EmbeddedTimelineService> timelineService) {
+  public HoodieReadClient(JavaSparkContext jsc, String basePath) {
     this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
         // by default we use HoodieBloomIndex
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(),
-        timelineService);
-  }
-
-  /**
-   * @param basePath path to Hoodie table
-   */
-  public HoodieReadClient(JavaSparkContext jsc, String basePath) {
-    this(jsc, basePath, Option.empty());
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
   }
 
   /**
@@ -100,14 +91,6 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> implements Serializ
    * @param clientConfig instance of HoodieWriteConfig
    */
   public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    this(jsc, clientConfig, Option.empty());
-  }
-
-  /**
-   * @param clientConfig instance of HoodieWriteConfig
-   */
-  public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
-      Option<EmbeddedTimelineService> timelineService) {
     this.jsc = jsc;
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java 
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 6a4ad03..99a795d 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -222,9 +220,9 @@ public class DataSourceUtils {
 
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, 
JavaRDD<HoodieRecord> incomingHoodieRecords,
-                                                     HoodieWriteConfig 
writeConfig, Option<EmbeddedTimelineService> timelineService) {
+                                                     HoodieWriteConfig 
writeConfig) {
     try {
-      HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, 
timelineService);
+      HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
       return client.tagLocation(incomingHoodieRecords)
           .filter(r -> !((HoodieRecord<HoodieRecordPayload>) 
r).isCurrentLocationKnown());
     } catch (TableNotFoundException e) {
@@ -236,10 +234,10 @@ public class DataSourceUtils {
 
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, 
JavaRDD<HoodieRecord> incomingHoodieRecords,
-                                                     Map<String, String> 
parameters, Option<EmbeddedTimelineService> timelineService) {
+                                                     Map<String, String> 
parameters) {
     HoodieWriteConfig writeConfig =
         
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
-    return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, 
timelineService);
+    return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
   }
 
   public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, 
String basePath) {
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 80a01d3..326595f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
       val hoodieRecords =
         if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-          DataSourceUtils.dropDuplicates(
-            jsc,
-            hoodieAllIncomingRecords,
-            mapAsJavaMap(parameters), client.getTimelineServer)
+          DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
         } else {
           hoodieAllIncomingRecords
         }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 97d3d42..4b69d22 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -357,7 +357,7 @@ public class DeltaSync implements Serializable {
     if (cfg.filterDupes) {
       // turn upserts to insert
       cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-      records = DataSourceUtils.dropDuplicates(jssc, records, 
writeClient.getConfig(), writeClient.getTimelineServer());
+      records = DataSourceUtils.dropDuplicates(jssc, records, 
writeClient.getConfig());
     }
 
     boolean isEmpty = records.isEmpty();

Reply via email to