bvaradar commented on a change in pull request #1678:
URL: https://github.com/apache/hudi/pull/1678#discussion_r435985550



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap.selector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+public class BootstrapRegexModeSelector extends BootstrapModeSelector {

Review comment:
       For Mixed mode bootstrapping, this class can be used to select few 
partitions for full bootstrap while configuring metadata bootstrap for others.

##########
File path: .travis.yml
##########
@@ -13,14 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+os: linux
 language: java
 dist: trusty
 jdk:
   - openjdk8
-sudo: required
-env:
-  - HUDI_QUIETER_LOGGING=1 TEST_SUITE=unit
-  - TEST_SUITE=integration
+jobs:

Review comment:
       Seeing timeout to CI runs. Went into a hole trying to reuse Spark 
Context. Ended up splitting CI run for hudi-client and others.  This is from 
#1619 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapWriteStatus.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.BootstrapSourceFileMapping;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * WriteStatus for Bootstrap.
+ */
+public class BootstrapWriteStatus extends WriteStatus {

Review comment:
       Used for metadata bootstrap. Contains information needed to create 
bootstrap index.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -146,6 +146,35 @@ public static SparkConf registerClasses(SparkConf conf) {
     return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
   }
 
+  /**
+   * Main API to run bootstrap to hudi.
+   */
+  public void bootstrap(Option<Map<String, String>> extraMetadata) {
+    if (rollbackPending) {
+      rollBackPendingBootstrap();
+    }
+    HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
+    table.bootstrap(jsc, extraMetadata);
+  }
+
+  /**
+   * Main API to rollback pending bootstrap.
+   */
+  protected void rollBackPendingBootstrap() {

Review comment:
       internal method made visible for testing. Bootstrap rollback is special 
as it would have to revert 2 commits (when both metadata and full bootstrap is 
configured)

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = 
"hoodie.bootstrap.source.base.path";

Review comment:
       External Source Path

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;

Review comment:
       Moved base class to hudi-client as this is needed for metadata bootstrap

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/MetadataBootstrapPartitionPathTranslator.java
##########
@@ -16,28 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-
-import org.apache.avro.generic.GenericRecord;
+package org.apache.hudi.client.bootstrap.translator;
 
 import java.io.Serializable;
+import org.apache.hudi.common.config.TypedProperties;
 
-/**
- * Abstract class to extend for plugging in extraction of {@link HoodieKey} 
from an Avro record.
- */
-public abstract class KeyGenerator implements Serializable {
+public abstract class MetadataBootstrapPartitionPathTranslator implements 
Serializable {

Review comment:
       HUDI-1001 to add implementations

##########
File path: 
hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
##########
@@ -52,11 +53,18 @@
   private static final String COW_TABLE_NAME = "stock_ticks_cow";
   private static final String MOR_TABLE_NAME = "stock_ticks_mor";
 
+  private static final String BOOTSTRAPPED_SRC_PATH = 
"/user/hive/warehouse/stock_ticks_cow_bs_src";

Review comment:
       what we do here : Once the first of ingestion is completed for COW and 
MOR table, we create a new parquet dataset by reading from COW. We then use 
this new parquet dataset to perform metadata bootstrap onto a new COW and MOR 
table. Hive and Spark SQL queries are run against these tables. We also perform 
upserts and compaction (MOR) on these metadata bootstrap tables and test 
through hive and spark-sql. Presto needs to be added after we support it.

##########
File path: packaging/hudi-hadoop-mr-bundle/pom.xml
##########
@@ -72,9 +72,19 @@
                   <include>org.objenesis:objenesis</include>
                   <include>com.esotericsoftware:minlog</include>
                   <include>org.apache.avro:avro</include>
+                  <include>org.apache.hbase:hbase-common</include>

Review comment:
       we need to add all these to bundles in order include HFile at runtime 
across query engines (fyi : @prashantwason )

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -122,20 +145,37 @@ public void shutdownGracefully() {
    * @throws Exception
    */
   public void sync() throws Exception {
-    if (cfg.continuousMode) {
-      deltaSyncService.start(this::onDeltaSyncShutdown);
-      deltaSyncService.waitForShutdown();
-      LOG.info("Delta Sync shutting down");
+    if (bootstrapExecutor.isPresent()) {

Review comment:
       Users are expected to run bootstrap executor in a stand-alone mode. Then 
they can switch to continuous mode for subsequent ingestion.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -81,6 +81,13 @@
 
   String INVALID_INSTANT_TS = "0";
 
+  // Instant corresponding to pristine state of the table after its creation

Review comment:
       I am using a value higher than INIT_INSTANT_TS for bootstrap instants so 
that we will have uniform semantics for incrementally reading bootstrap vs 
non-bootstrap commits. 

##########
File path: 
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
##########
@@ -164,7 +164,7 @@ public void testOverwriteHoodieProperties() throws 
IOException {
 
     // check result
     List<String> allPropsStr = Arrays.asList("hoodie.table.name", 
"hoodie.table.type",
-        "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
+        "hoodie.archivelog.folder", "hoodie.bootstrap.index.class", 
"hoodie.timeline.layout.version");

Review comment:
       Bootstrap Index implementation is pluggable per request from AWS. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = 
"hoodie.bootstrap.source.base.path";
+  public static final String BOOTSTRAP_MODE_SELECTOR = 
"hoodie.bootstrap.mode.selector";
+  public static final String FULL_BOOTRAP_INPUT_PROVIDER = 
"hoodie.bootstrap.full.input.provider";
+  public static final String BOOTSTRAP_KEYGEN_CLASS = 
"hoodie.bootstrap.keygen.class";

Review comment:
       Note: Same keygen class needs to be configured as the one used for 
regular upserts to have a consistent upsert behavior after bootstrap.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/utils/MergingParquetIterator.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+public class MergingParquetIterator<T extends GenericRecord> implements 
Iterator<T> {

Review comment:
       Used to provide stitched records from older file slice for 
upsert/compact on metadata bootstrapped table.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -173,23 +173,26 @@ protected void 
updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
   protected void commitOnAutoCommit(HoodieWriteMetadata result) {
     if (config.shouldAutoCommit()) {
       LOG.info("Auto commit enabled: Committing " + instantTime);
-      commit(Option.empty(), result);
+      commit(extraMetadata, result);
     } else {
       LOG.info("Auto commit disabled for " + instantTime);
     }
   }
 
-  private void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {

Review comment:
       Metadata bootstrap needs Write Statuses for creating bootstrap index. 
But, we also need to ensure collect on the Write Status RDD is done only once. 
Hence, this change.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = 
"hoodie.bootstrap.source.base.path";
+  public static final String BOOTSTRAP_MODE_SELECTOR = 
"hoodie.bootstrap.mode.selector";

Review comment:
       Configure class which decides partitions for doing metadata/full 
bootstrap.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -118,14 +119,17 @@
       "_.hoodie.allow.multi.write.on.same.instant";
   private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = 
"false";
 
+  public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";

Review comment:
       Parquet-avro has a bug when dealing with nested fields which are lists. 
It looks like Parquet-avro changes the schema field name of this nested field.
   
   Schema used to write  : 
https://gist.github.com/bvaradar/fb7af49258b8bfc232607a06d3adc8db#file-writer-schema-nested-array-schema-L70
   Schema returned by Parquet-Avro : 
https://gist.github.com/bvaradar/c69d9af4cc7b62bf81e4cf938d0ba9ca#file-same-schema-when-read-from-parquet-L72
   
   Using explicit record translation to new schema in Hudi gets it to work.  
Logic in MergeHelper.java

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class HoodieBootstrapHandle<T extends HoodieRecordPayload> extends 
HoodieCreateHandle<T> {

Review comment:
       Used by metadata bootstrap to ensure the schema is set correctly and 
skeleton and external files is 1-1.

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+
+public class KeyGenUtils {

Review comment:
       Most KeyGenerator classes have boiler plate code. Have refactored to 
ensure they are handle them consistently.  Also, I needed separate APIs to  get 
only the record key (not partition-path) for  Metadata Bootstrap.  Made changes 
in KeyGenerator classes for that.

##########
File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
##########
@@ -124,13 +125,15 @@ public String fileSizeStats(
     Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
     HashMap<String, Histogram> commitHistoMap = new HashMap<>();
     for (FileStatus fileStatus : statuses) {
-      String instantTime = 
FSUtils.getCommitTime(fileStatus.getPath().getName());
-      long sz = fileStatus.getLen();
-      if (!commitHistoMap.containsKey(instantTime)) {
-        commitHistoMap.put(instantTime, new Histogram(new 
UniformReservoir(MAX_FILES)));
+      if 
(!fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
 {

Review comment:
       CLI Tests caught this issue !! .bootstrap folder is under .hoodie which 
shows up in this listing  

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
##########
@@ -155,23 +161,10 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc, 
String compactionInstan
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(getHadoopConf(), 
upsertHandle.getWriterSchema());
-      BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = 
null;
-      try (ParquetReader<IndexedRecord> reader =
-          
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build())
 {
-        wrapper = new SparkBoundedInMemoryExecutor(config, new 
ParquetReaderIterator(reader),
-            new UpdateHandler(upsertHandle), x -> x);
-        wrapper.execute();
-      } catch (Exception e) {
-        throw new HoodieException(e);
-      } finally {
-        upsertHandle.close();
-        if (null != wrapper) {
-          wrapper.shutdownNow();
-        }
-      }
+      MergeHelper.runMerge(this, upsertHandle);

Review comment:
       Moved to avoid duplication in handling merge for  both compaction and 
COW upsert

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
+import 
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = 
"hoodie.bootstrap.source.base.path";
+  public static final String BOOTSTRAP_MODE_SELECTOR = 
"hoodie.bootstrap.mode.selector";
+  public static final String FULL_BOOTRAP_INPUT_PROVIDER = 
"hoodie.bootstrap.full.input.provider";

Review comment:
       Configure class which provides RDD given a list of partitions. Defaults 
are provided at DeltaStreamer/Spark-DataSource level. 
   
   @vinothchandar : This doesnt have to be configurable but it is kept this way 
as a dependency injection to avoid moving 
SparkDataSourceBasedFullBootstrapInputProvider and all its dependencies from 
hudi-spark to hudi-client.  Would need a discussion to come up with cleaner 
approach. 

##########
File path: hudi-cli/pom.xml
##########
@@ -202,6 +237,12 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>

Review comment:
       Hudi-utilities-bundle shades hbase classes which also ends up in 
bootstrap file itself (key class is added in footer) and hence we need to use 
bundle even in CLI when doing any operations involving bootstrap table. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java
##########
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.bootstrap.MetadataBootstrapKeyGenerator;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
+import org.apache.hudi.client.bootstrap.BootstrapSourceSchemaProvider;
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.client.bootstrap.FullBootstrapInputProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.utils.ParquetReaderIterator;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BootstrapSourceFileMapping;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieBootstrapHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
+import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hudi.table.action.commit.CommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
+    extends BaseCommitActionExecutor<T, HoodieBootstrapWriteMetadata> {
+
+  private static final Logger LOG = 
LogManager.getLogger(BootstrapCommitActionExecutor.class);
+  protected String bootstrapSchema = null;
+  private transient FileSystem bootstrapSourceFileSystem;
+
+  public BootstrapCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig 
config, HoodieTable<?> table,
+      Option<Map<String, String>> extraMetadata) {
+    super(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
+        .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
+        .withBulkInsertParallelism(config.getBootstrapParallelism())
+        .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
WriteOperationType.BOOTSTRAP,
+        extraMetadata);
+    bootstrapSourceFileSystem = 
FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
+  }
+
+  private void checkArguments() {
+    ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
+        "Ensure Bootstrap Source Path is set");
+    ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != 
null,
+        "Ensure Bootstrap Partition Selector is set");
+    ValidationUtils.checkArgument(config.getBootstrapKeyGeneratorClass() != 
null,
+        "Ensure bootstrap key generator class is set");
+  }
+
+  @Override
+  public HoodieBootstrapWriteMetadata execute() {
+    checkArguments();
+    try {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      Option<HoodieInstant> completetedInstant =
+          
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+      ValidationUtils.checkArgument(!completetedInstant.isPresent(),
+          "Active Timeline is expected to be empty for bootstrapped to be 
performed. "
+              + "If you want to re-bootstrap, please rollback bootstrap first 
!!");
+      Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> 
partitionSelections =
+          listAndProcessSourcePartitions(metaClient);
+
+      // First run metadata bootstrap which will implicitly commit
+      HoodieWriteMetadata metadataResult = 
metadataBootstrap(partitionSelections.get(
+          BootstrapMode.METADATA_ONLY_BOOTSTRAP));
+      // if there are full bootstrap to be performed, perform that too
+      HoodieWriteMetadata fullBootstrapResult =
+          fullBootstrap(partitionSelections.get(BootstrapMode.FULL_BOOTSTRAP));
+      return new HoodieBootstrapWriteMetadata(metadataResult, 
fullBootstrapResult);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  protected String getSchemaToStoreInCommit() {
+    return bootstrapSchema;
+  }
+
+  /**
+   * Perform Metadata Bootstrap.
+   * @param partitionFilesList List of partitions and files within that 
partitions
+   */
+  protected HoodieWriteMetadata metadataBootstrap(List<Pair<String, 
List<HoodieFileStatus>>> partitionFilesList) {
+    if (null == partitionFilesList || partitionFilesList.isEmpty()) {
+      return null;
+    }
+
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    metaClient.getActiveTimeline().createNewInstant(
+        new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
+            HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
+
+    table.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(State.REQUESTED,
+        metaClient.getCommitActionType(), 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
+
+    JavaRDD<BootstrapWriteStatus> bootstrapWriteStatuses = 
runMetadataBootstrap(partitionFilesList);
+
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
+    return result;
+  }
+
+  @Override
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {
+    // Perform bootstrap index write and then commit. Make sure both 
record-key and bootstrap-index
+    // is all done in a single job DAG.
+    Map<String, List<Pair<BootstrapSourceFileMapping, HoodieWriteStat>>> 
bootstrapSourceAndStats =
+        result.getWriteStatuses().collect().stream()
+            .map(w -> {
+              BootstrapWriteStatus ws = (BootstrapWriteStatus) w;
+              return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
+            }).collect(Collectors.groupingBy(w -> 
w.getKey().getHudiPartitionPath()));
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    try (BootstrapIndex.IndexWriter indexWriter = 
BootstrapIndex.getBootstrapIndex(metaClient)
+        .createWriter(config.getBootstrapSourceBasePath())) {
+      LOG.info("Starting to write bootstrap index for source " + 
config.getBootstrapSourceBasePath() + " in table "
+          + config.getBasePath());
+      indexWriter.begin();
+      bootstrapSourceAndStats.forEach((key, value) -> 
indexWriter.appendNextPartition(key,
+          value.stream().map(w -> w.getKey()).collect(Collectors.toList())));
+      indexWriter.finish();
+      LOG.info("Finished writing bootstrap index for source " + 
config.getBootstrapSourceBasePath() + " in table "
+          + config.getBasePath());
+    }
+
+    super.commit(extraMetadata, result, 
bootstrapSourceAndStats.values().stream()
+        .flatMap(f -> 
f.stream().map(Pair::getValue)).collect(Collectors.toList()));
+    LOG.info("Done Committing metadata bootstrap !!");
+  }
+
+  /**
+   * Perform Metadata Bootstrap.
+   * @param partitionFilesList List of partitions and files within that 
partitions
+   */
+  protected HoodieWriteMetadata fullBootstrap(List<Pair<String, 
List<HoodieFileStatus>>> partitionFilesList) {
+    if (null == partitionFilesList || partitionFilesList.isEmpty()) {
+      return null;
+    }
+    TypedProperties properties = new TypedProperties();
+    properties.putAll(config.getProps());
+    FullBootstrapInputProvider inputProvider =
+        (FullBootstrapInputProvider) 
ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
+            properties, jsc);
+    JavaRDD<HoodieRecord> inputRecordsRDD =
+        inputProvider.generateInputRecordRDD("bootstrap_source", 
config.getBootstrapSourceBasePath(),
+            partitionFilesList);
+    // Start Full Bootstrap
+    final HoodieInstant requested = new HoodieInstant(State.REQUESTED, 
table.getMetaClient().getCommitActionType(),
+        HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+    table.getActiveTimeline().createNewInstant(requested);
+    // Setup correct schema and run bulk insert.
+    return getBulkInsertActionExecutor(inputRecordsRDD).execute();
+  }
+
+  protected CommitActionExecutor<T> 
getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
+    return new BulkInsertCommitActionExecutor(jsc, new 
HoodieWriteConfig.Builder().withProps(config.getProps())
+        .withSchema(bootstrapSchema).build(), table, 
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+        inputRecordsRDD, extraMetadata);
+  }
+
+  private BootstrapWriteStatus handleMetadataBootstrap(String 
srcPartitionPath, String partitionPath,
+      HoodieFileStatus srcFileStatus, MetadataBootstrapKeyGenerator 
keyGenerator) {
+
+    Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
+    HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+        table, partitionPath, FSUtils.createNewFileIdPfx(), 
table.getSparkTaskContextSupplier());
+    Schema avroSchema = null;
+    try {
+      ParquetMetadata readFooter = 
ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath,
+          ParquetMetadataConverter.NO_FILTER);
+      MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
+      avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+      Schema recordKeySchema = 
HoodieAvroUtils.generateProjectionSchema(avroSchema,
+          keyGenerator.getTopLevelRecordKeyFields());
+      LOG.info("Schema to be used for reading record Keys :" + 
recordKeySchema);
+      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), 
recordKeySchema);
+      AvroReadSupport.setRequestedProjection(table.getHadoopConf(), 
recordKeySchema);
+
+      BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = 
null;
+      try (ParquetReader<IndexedRecord> reader =
+          
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build())
 {
+        wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, 
HoodieRecord, Void>(config,
+            new ParquetReaderIterator(reader), new 
BootstrapRecordWriter(bootstrapHandle), inp -> {
+          String recKey = keyGenerator.getRecordKey(inp);
+          GenericRecord gr = new 
GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
+          gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
+          BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
+          HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, 
partitionPath), payload);
+          return rec;
+        });
+        wrapper.execute();
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      } finally {
+        bootstrapHandle.close();
+        if (null != wrapper) {
+          wrapper.shutdownNow();
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+    BootstrapWriteStatus writeStatus = 
(BootstrapWriteStatus)bootstrapHandle.getWriteStatus();
+    BootstrapSourceFileMapping bootstrapSourceFileMapping = new 
BootstrapSourceFileMapping(
+        config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
+        srcFileStatus, writeStatus.getFileId());
+    writeStatus.setBootstrapSourceFileMapping(bootstrapSourceFileMapping);
+    return writeStatus;
+  }
+
+  /**
+   * Return Bootstrap Mode selections for partitions listed and figure out 
bootstrap Schema.
+   * @param metaClient Hoodie Table Meta Client.
+   * @return
+   * @throws IOException
+   */
+  private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> 
listAndProcessSourcePartitions(
+      HoodieTableMetaClient metaClient) throws IOException {
+    //TODO: Added HoodieFilter for manually testing bootstrap from source hudi 
table. Needs to be reverted.
+    final PathFilter hoodieFilter = new HoodieROTablePathFilter();
+    List<Pair<String, List<HoodieFileStatus>>> folders =

Review comment:
       HUDI-999 to track parallelizing this step.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileStatusDTO.java
##########
@@ -68,7 +68,7 @@ public static FileStatusDTO fromFileStatus(FileStatus 
fileStatus) {
       dto.blockReplication = fileStatus.getReplication();
       dto.blocksize = fileStatus.getBlockSize();
       dto.modificationTime = fileStatus.getModificationTime();
-      dto.accessTime = fileStatus.getModificationTime();
+      dto.accessTime = fileStatus.getAccessTime();

Review comment:
       FYI : this is a bug but doesn't have any impact currently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to