[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-04-12 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r407235240
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) 
from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieIndex {
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
+ HoodieTable hoodieTable) {
+return writeStatusRDD;
+  }
+
+  @Override
+  public boolean rollbackCommit(String commitTime) {
+return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+return true;
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+if (config.getSimpleIndexUseCaching()) {
+  
recordRDD.persist(SparkConfigUtils.getBloomIndexInputStorageLevel(config.getProps()));
+}
+
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Tagging incoming records 
with record location");
+JavaRDD>>> untaggedRecordsRDD = 
incomingRecords.leftOuterJoin(existingRecords)
+.map(entry -> new Tuple2(entry._1, new Tuple2(entry._2._1, 
Option.ofNullable(entry._2._2.orNull();
+
+JavaRDD> taggedRecordRDD = untaggedRecordsRDD.map(entry -> 
getTaggedRecord(entry._2._1, entry._2._2));
+
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.unpersist(); // unpersist the input Record RDD
+}
+return taggedRecordRDD;
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not.
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD> incomingRecords =
+hoodieKeys.mapToPair(entry -> new Tuple2<>(entry, Option.empty()));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(hoodieKeys, jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Joining existing records 
with 

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-04-12 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r407235052
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) 
from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieIndex {
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
+ HoodieTable hoodieTable) {
+return writeStatusRDD;
+  }
+
+  @Override
+  public boolean rollbackCommit(String commitTime) {
+return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+return true;
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+if (config.getSimpleIndexUseCaching()) {
+  
recordRDD.persist(SparkConfigUtils.getBloomIndexInputStorageLevel(config.getProps()));
+}
+
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Tagging incoming records 
with record location");
+JavaRDD>>> untaggedRecordsRDD = 
incomingRecords.leftOuterJoin(existingRecords)
+.map(entry -> new Tuple2(entry._1, new Tuple2(entry._2._1, 
Option.ofNullable(entry._2._2.orNull();
+
+JavaRDD> taggedRecordRDD = untaggedRecordsRDD.map(entry -> 
getTaggedRecord(entry._2._1, entry._2._2));
+
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.unpersist(); // unpersist the input Record RDD
+}
+return taggedRecordRDD;
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not.
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD> incomingRecords =
+hoodieKeys.mapToPair(entry -> new Tuple2<>(entry, Option.empty()));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(hoodieKeys, jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Joining existing records 
with 

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-04-12 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r407234877
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.hudi.index.HoodieIndexUtils.loadLatestDataFilesForAllPartitions;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) 
from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieIndex {
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
+ HoodieTable hoodieTable) {
+return writeStatusRDD;
+  }
+
+  @Override
+  public boolean rollbackCommit(String commitTime) {
+return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+return true;
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+if (config.getSimpleIndexUseCaching()) {
+  
recordRDD.persist(SparkConfigUtils.getBloomIndexInputStorageLevel(config.getProps()));
+}
+
+JavaPairRDD incomingRecords = 
recordRDD.mapToPair(entry -> new Tuple2<>(entry.getKey(), entry));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(incomingRecords.keys(), jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Tagging incoming records 
with record location");
+JavaRDD>>> untaggedRecordsRDD = 
incomingRecords.leftOuterJoin(existingRecords)
+.map(entry -> new Tuple2(entry._1, new Tuple2(entry._2._1, 
Option.ofNullable(entry._2._2.orNull();
+
+JavaRDD> taggedRecordRDD = untaggedRecordsRDD.map(entry -> 
getTaggedRecord(entry._2._1, entry._2._2));
+
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.unpersist(); // unpersist the input Record RDD
+}
+return taggedRecordRDD;
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not.
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD> incomingRecords =
+hoodieKeys.mapToPair(entry -> new Tuple2<>(entry, Option.empty()));
+
+JavaPairRDD existingRecords = 
fetchRecordLocations(hoodieKeys, jsc, hoodieTable);
+
+jsc.setJobGroup(this.getClass().getSimpleName(), "Joining existing records 
with 

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-04-07 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r405076523
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSimpleIndex.class);
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD partitionRecordKeyPairRDD =
+hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), 
key.getRecordKey()));
+
+// Lookup indexes for all the partition/recordkey pair
+JavaPairRDD recordKeyLocationRDD =
+lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+
+JavaPairRDD keyHoodieKeyPairRDD = 
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
+
+return 
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+  Option> partitionPathFileidPair;
+  if (keyLoc._2._2.isPresent()) {
+partitionPathFileidPair = 
Option.of(Pair.of(keyLoc._1().getPartitionPath(), 
keyLoc._2._2.get().getFileId()));
+  } else {
+partitionPathFileidPair = Option.empty();
+  }
+  return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
+});
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+
+// Step 0: cache the input record RDD
+if (config.getBloomIndexUseCaching()) {
+  recordRDD.persist(config.getBloomIndexInputStorageLevel());
+}
+
+// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
+JavaPairRDD partitionRecordKeyPairRDD =
+recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), 
record.getRecordKey()));
+
+// Lookup indexes for all the partition/recordkey pair
+JavaPairRDD keyFilenamePairRDD =
+lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+
+// Cache the result, for subsequent stages.
+if (config.getBloomIndexUseCaching()) {
+  

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-04-07 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r405076331
 
 

 ##
 File path: 
hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
 ##
 @@ -79,9 +82,22 @@
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
 public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase 
{
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+  private final IndexType indexType;
+
+  @Parameterized.Parameters(name = "{index}: Test with IndexType={0}")
+  public static Collection data() {
+Object[][] data =
+new Object[][] 
{{IndexType.BLOOM},{IndexType.GLOBAL_BLOOM},{IndexType.SIMPLE}};
 
 Review comment:
   Could be okay then


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400430648
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.func.LazyIterableIterator;
+import org.apache.hudi.table.HoodieTable;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) 
from parquet and joins with incoming records to find the
+ * tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSimpleIndex.class);
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+
+// Step 0: cache the input record RDD
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.persist(config.getSimpleIndexInputStorageLevel());
+}
+
+// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
+JavaPairRDD partitionRecordKeyPairRDD =
+recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), 
record.getRecordKey()));
+
+// Step 2: Load all involved files as  pairs
+List affectedPartitionPathList = 
partitionRecordKeyPairRDD.map(tuple -> tuple._1).distinct().collect();
+JavaRDD> fileInfoList = jsc.parallelize(
+loadAllFilesForPartitions(affectedPartitionPathList, jsc, 
hoodieTable)).sortBy(Tuple2::_1, true, config.getSimpleIndexParallelism());
+
+// Step 3: Lookup indexes for all the partition/recordkey pair
+JavaPairRDD keyFilenamePairRDD = 
findMatchingFilesForRecordKeys(fileInfoList, partitionRecordKeyPairRDD, 
hoodieTable);
+
+// Step 4: Tag the incoming records, as inserts or updates, by joining 
with existing record keys
+JavaRDD> taggedRecordRDD = 
tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
+
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.unpersist(); // unpersist the input Record RDD
+}
+return taggedRecordRDD;
+  }
+
+  @Override
+  public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
+ HoodieTable hoodieTable) {
+return writeStatusRDD;
+  }
+
+  /**
+   * Lookup the location for each record key and return the 
pair for all record keys already
+   * present and drop the record keys if not present.
+   */
+  @Override
+  protected JavaPairRDD lookupIndex(
+  JavaPairRDD partitionRecordKeyPairRDD, final 
JavaSparkContext jsc,
+  final HoodieTable hoodieTable) {
+// Obtain records per partition, in the incoming records
+Map recordsPerPartition = 
partitionRecordKeyPairRDD.countByKey();
+List affectedPartitionPathList = new 

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400429741
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.func.LazyIterableIterator;
+import org.apache.hudi.table.HoodieTable;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) 
from parquet and joins with incoming records to find the
+ * tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSimpleIndex.class);
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+
+// Step 0: cache the input record RDD
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.persist(config.getSimpleIndexInputStorageLevel());
+}
+
+// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
+JavaPairRDD partitionRecordKeyPairRDD =
+recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), 
record.getRecordKey()));
+
+// Step 2: Load all involved files as  pairs
+List affectedPartitionPathList = 
partitionRecordKeyPairRDD.map(tuple -> tuple._1).distinct().collect();
+JavaRDD> fileInfoList = jsc.parallelize(
+loadAllFilesForPartitions(affectedPartitionPathList, jsc, 
hoodieTable)).sortBy(Tuple2::_1, true, config.getSimpleIndexParallelism());
+
+// Step 3: Lookup indexes for all the partition/recordkey pair
+JavaPairRDD keyFilenamePairRDD = 
findMatchingFilesForRecordKeys(fileInfoList, partitionRecordKeyPairRDD, 
hoodieTable);
+
+// Step 4: Tag the incoming records, as inserts or updates, by joining 
with existing record keys
+JavaRDD> taggedRecordRDD = 
tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);
+
+if (config.getSimpleIndexUseCaching()) {
+  recordRDD.unpersist(); // unpersist the input Record RDD
+}
+return taggedRecordRDD;
+  }
+
+  @Override
+  public JavaRDD updateLocation(JavaRDD 
writeStatusRDD, JavaSparkContext jsc,
+ HoodieTable hoodieTable) {
+return writeStatusRDD;
+  }
+
+  /**
+   * Lookup the location for each record key and return the 
pair for all record keys already
+   * present and drop the record keys if not present.
+   */
+  @Override
+  protected JavaPairRDD lookupIndex(
+  JavaPairRDD partitionRecordKeyPairRDD, final 
JavaSparkContext jsc,
+  final HoodieTable hoodieTable) {
+// Obtain records per partition, in the incoming records
+Map recordsPerPartition = 
partitionRecordKeyPairRDD.countByKey();
+List affectedPartitionPathList = new 

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400425128
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
 
 Review comment:
   Will do.. We should really create a new abstract class 
`AbstactFileLevelIndex` which can be extended by the BloomIndex and SimpleIndex 
classes..  HoodieIndex should be left alone, since stuff the HBaseIndex does 
not really work this way.. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400424181
 
 

 ##
 File path: hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
 ##
 @@ -118,9 +121,10 @@ protected HoodieIndex(HoodieWriteConfig config) {
   /**
* Each index type should implement it's own logic to release any resources 
acquired during the process.
*/
-  public void close() {}
+  public void close() {
+  }
 
   public enum IndexType {
-HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM
+HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE
 
 Review comment:
   let's fix it in this itself? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400427139
 
 

 ##
 File path: 
hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
 ##
 @@ -79,9 +82,22 @@
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
 public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase 
{
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+  private final IndexType indexType;
+
+  @Parameterized.Parameters(name = "{index}: Test with IndexType={0}")
+  public static Collection data() {
+Object[][] data =
+new Object[][] 
{{IndexType.BLOOM},{IndexType.GLOBAL_BLOOM},{IndexType.SIMPLE}};
 
 Review comment:
   how much does it increase by... doing the full suite with both indexes may 
be an overkill IMO 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400426536
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSimpleIndex.class);
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD partitionRecordKeyPairRDD =
+hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), 
key.getRecordKey()));
+
+// Lookup indexes for all the partition/recordkey pair
+JavaPairRDD recordKeyLocationRDD =
+lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+
+JavaPairRDD keyHoodieKeyPairRDD = 
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
+
+return 
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+  Option> partitionPathFileidPair;
+  if (keyLoc._2._2.isPresent()) {
+partitionPathFileidPair = 
Option.of(Pair.of(keyLoc._1().getPartitionPath(), 
keyLoc._2._2.get().getFileId()));
+  } else {
+partitionPathFileidPair = Option.empty();
+  }
+  return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
+});
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+
+// Step 0: cache the input record RDD
+if (config.getBloomIndexUseCaching()) {
+  recordRDD.persist(config.getBloomIndexInputStorageLevel());
+}
+
+// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
+JavaPairRDD partitionRecordKeyPairRDD =
+recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), 
record.getRecordKey()));
+
+// Lookup indexes for all the partition/recordkey pair
+JavaPairRDD keyFilenamePairRDD =
+lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+
+// Cache the result, for subsequent stages.
+if (config.getBloomIndexUseCaching()) {
+  

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-30 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r400425599
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSimpleIndex.class);
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+super(config);
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD>> 
fetchRecordLocation(JavaRDD hoodieKeys,
+   
   JavaSparkContext jsc, HoodieTable hoodieTable) {
+JavaPairRDD partitionRecordKeyPairRDD =
+hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), 
key.getRecordKey()));
+
+// Lookup indexes for all the partition/recordkey pair
+JavaPairRDD recordKeyLocationRDD =
+lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+
+JavaPairRDD keyHoodieKeyPairRDD = 
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
+
+return 
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+  Option> partitionPathFileidPair;
+  if (keyLoc._2._2.isPresent()) {
+partitionPathFileidPair = 
Option.of(Pair.of(keyLoc._1().getPartitionPath(), 
keyLoc._2._2.get().getFileId()));
+  } else {
+partitionPathFileidPair = Option.empty();
+  }
+  return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
+});
+  }
+
+  @Override
+  public JavaRDD> tagLocation(JavaRDD> 
recordRDD, JavaSparkContext jsc,
+  HoodieTable hoodieTable) {
+
+// Step 0: cache the input record RDD
+if (config.getBloomIndexUseCaching()) {
+  recordRDD.persist(config.getBloomIndexInputStorageLevel());
+}
+
+// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
 
 Review comment:
   I have paused the other impl for now.. But let's get the abstractions right 
once and reuse.. with you there


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-25 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r398223341
 
 

 ##
 File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 ##
 @@ -103,6 +120,42 @@
 return rowKeys;
   }
 
+  /**
+   * Read the rows with record key and partition path from the given parquet 
file
+   *
+   * @param filePath  The parquet file path.
+   * @param configuration configuration to build fs object
+   * @return Set Set of row keys matching candidateRecordKeys
+   */
+  public static List, Option>> 
fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path 
filePath,
+   
   String baseInstantTime,
+   
   String fileId) {
+List, Option>> rows = new 
ArrayList<>();
+try {
+  if (!filePath.getFileSystem(configuration).exists(filePath)) {
+return new ArrayList<>();
+  }
+  Configuration conf = new Configuration(configuration);
+  conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
+  Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
+  AvroReadSupport.setAvroReadSchema(conf, readSchema);
+  AvroReadSupport.setRequestedProjection(conf, readSchema);
+  ParquetReader reader = 
AvroParquetReader.builder(filePath).withConf(conf).build();
 
 Review comment:
   I think we can fix it in this patch itself.. its a critical aspect 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-25 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r398222054
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
 
 Review comment:
   let's write docs using higher level abstractions like file groups/slices, 
base/log file? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-25 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r398221409
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 ##
 @@ -58,6 +58,11 @@
   public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = 
BloomFilterTypeCode.SIMPLE.name();
   public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = 
"hoodie.bloom.index.filter.dynamic.max.entries";
   public static final String 
DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "10";
+  public static final String SIMPLE_BLOOM_INDEX_USE_CACHING_PROP = 
"hoodie.simple.bloom.index.use.caching";
 
 Review comment:
   call it `SimpleBloom` when it does not use any bloom filters is very 
confusing.. Can we make it just SimpleIndex? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] Adding Simple Index

2020-03-25 Thread GitBox
vinothchandar commented on a change in pull request #1402: [WIP][HUDI-407] 
Adding Simple Index
URL: https://github.com/apache/incubator-hudi/pull/1402#discussion_r398222688
 
 

 ##
 File path: 
hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieSimpleIndex.java
 ##
 @@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * A simple index which reads interested fields from parquet and joins with 
incoming records to find the tagged location
+ *
+ * @param 
+ */
+public class HoodieSimpleIndex extends 
HoodieBloomIndex {
 
 Review comment:
   I think this very confusing.. may be we need to push a bunch to HoodieIndex 
abstract class or have a new class introduced..  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services