[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-03-11 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390761706
 
 

 ##
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 ##
 @@ -693,6 +699,146 @@ public void 
testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception
 testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
   }
 
+  private void prepareCsvDFSSource(
+  boolean hasHeader, char sep, boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+String sourceRoot = dfsBasePath + "/csvFiles";
+String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : 
"_c0";
+
+// Properties used for testing delta-streamer with CSV source
+TypedProperties csvProps = new TypedProperties();
+csvProps.setProperty("include", "base.properties");
+csvProps.setProperty("hoodie.datasource.write.recordkey.field", 
recordKeyField);
+csvProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+if (useSchemaProvider) {
+  
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source-flattened.avsc");
+  if (hasTransformer) {
+
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target-flattened.avsc");
+  }
+}
+csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+
+if (sep != ',') {
+  if (sep == '\t') {
+csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+  } else {
+csvProps.setProperty("hoodie.deltastreamer.csv.sep", 
Character.toString(sep));
+  }
+}
+if (hasHeader) {
+  csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
+}
+
+UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" 
+ PROPS_FILENAME_TEST_CSV);
+
+String path = sourceRoot + "/1.csv";
+HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+UtilitiesTestBase.Helpers.saveCsvToDFS(
+hasHeader, sep,
+Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 
CSV_NUM_RECORDS, true)),
+dfs, path);
+  }
+
+  private void testCsvDFSSource(
+  boolean hasHeader, char sep, boolean useSchemaProvider, String 
transformerClassName) throws Exception {
+prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassName != null);
+String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+String sourceOrderingField = (hasHeader || useSchemaProvider) ? 
"timestamp" : "_c0";
+HoodieDeltaStreamer deltaStreamer =
+new HoodieDeltaStreamer(TestHelpers.makeConfig(
+tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
+transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+useSchemaProvider, 1000, false, null, null, sourceOrderingField), 
jsc);
+deltaStreamer.sync();
+TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
+testNum++;
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws 
Exception {
+// The CSV files have header, the columns are separated by ',', the 
default separator
+// No schema provider is specified, no transformer is applied
+// In this case, the source schema comes from the inferred schema of the 
CSV files
+testCsvDFSSource(true, ',', false, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws 
Exception {
+// The CSV files have header, the columns are separated by '\t',
+// which is passed in through the Hudi CSV properties
+// No schema provider is specified, no transformer is applied
+// In this case, the source schema comes from the inferred schema of the 
CSV files
+testCsvDFSSource(true, '\t', false, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws 
Exception {
+// The CSV files have header, the columns are separated by '\t'
+// File schema provider is used, no transformer is applied
+// In this case, the source schema comes from the source Avro schema file
+testCsvDFSSource(true, '\t', true, null);
+  }
+
+  @Test
+  public void 
testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() 
throws Exception {
+// The CSV files have header, the columns are separated by '\t'
+// No schema provider is specified, transformer is applied
+// In this case, the source schema comes from the inferred schema of the 
CSV files.
+// Target schema is determined based on the Dataframe after transformation
+testCsvDFSSource(true, '\t', false, 
TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+ 

[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-03-11 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390763221
 
 

 ##
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic tests for {@link CsvDFSSource}.
 
 Review comment:
   Actually this class runs the tests defined in `AbstractDFSSourceTestBase` 
with logic for CSV source implemented in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-03-11 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390760597
 
 

 ##
 File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Reads data from CSV files on DFS as the data source.
+ *
+ * Internally, we use Spark to read CSV files thus any limitation of Spark CSV 
also applies here
+ * (e.g., limited support for nested schema).
+ *
+ * You can set the CSV-specific configs in the format of 
hoodie.deltastreamer.csv.*
+ * that are Spark compatible to deal with CSV files in Hudi.  The supported 
options are:
+ *
+ *   "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", 
"comment",
+ *   "header", "enforceSchema", "inferSchema", "samplingRatio", 
"ignoreLeadingWhiteSpace",
+ *   "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", 
"positiveInf",
+ *   "negativeInf", "dateFormat", "timestampFormat", "maxColumns", 
"maxCharsPerColumn",
+ *   "mode", "columnNameOfCorruptRecord", "multiLine"
+ *
+ * Detailed information of these CSV options can be found at:
+ * 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-
+ *
+ * If the source Avro schema is provided through the {@link 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider}
+ * using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the 
schema is
+ * passed to the CSV reader without inferring the schema from the CSV file.
+ */
+public class CsvDFSSource extends RowSource {
+  // CsvSource config prefix
+  public static final String CSV_SRC_CONFIG_PREFIX = 
"hoodie.deltastreamer.csv.";
+  // CSV-specific configurations to pass in from Hudi to Spark
+  public static final List CSV_CONFIG_KEYS = Arrays.asList(
+  "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", 
"comment",
+  "header", "enforceSchema", "inferSchema", "samplingRatio", 
"ignoreLeadingWhiteSpace",
+  "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", 
"positiveInf",
+  "negativeInf", "dateFormat", "timestampFormat", "maxColumns", 
"maxCharsPerColumn",
+  "mode", "columnNameOfCorruptRecord", "multiLine"
+  );
+
+  private final DFSPathSelector pathSelector;
+  private final StructType sourceSchema;
+
+  public CsvDFSSource(TypedProperties props,
+  JavaSparkContext sparkContext,
+  SparkSession sparkSession,
+  SchemaProvider schemaProvider) {
+super(props, sparkContext, sparkSession, schemaProvider);
+this.pathSelector = new DFSPathSelector(props, 
sparkContext.hadoopConfiguration());
+if (overriddenSchemaProvider != null) {
+  sourceSchema = (StructType) 
SchemaConverters.toSqlType(overriddenSchemaProvider.getSourceSchema()).dataType();
 
 Review comment:
   Good point.  Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-03-11 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390761144
 
 

 ##
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 ##
 @@ -653,7 +659,7 @@ private void prepareParquetDFSSource(boolean 
useSchemaProvider, boolean hasTrans
 if (useSchemaProvider) {
   
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
   if (hasTransformer) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
+
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
 
 Review comment:
   I don't remember fixing unit tests.  Given that this is optional so it is 
possible that the data written may be different from the schema designated.  
However, I think the integration tests should be able to catch any issue due to 
schema mismatch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-03-11 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r390762093
 
 

 ##
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
 ##
 @@ -193,19 +204,60 @@ public static void saveStringsToDFS(String[] lines, 
FileSystem fs, String target
   os.close();
 }
 
+/**
+ * Converts the json records into CSV format and writes to a file.
+ *
+ * @param hasHeader  whether the CSV file should have a header line.
+ * @param sep  the column separator to use.
+ * @param lines  the records in JSON format.
+ * @param fs  {@link FileSystem} instance.
+ * @param targetPath  File path.
+ * @throws IOException
+ */
+public static void saveCsvToDFS(
+boolean hasHeader, char sep,
+String[] lines, FileSystem fs, String targetPath) throws IOException {
+  Builder csvSchemaBuilder = CsvSchema.builder();
+
+  ArrayNode arrayNode = mapper.createArrayNode();
+  Arrays.stream(lines).forEachOrdered(
+  line -> {
+try {
+  arrayNode.add(mapper.readValue(line, ObjectNode.class));
+} catch (IOException e) {
+  e.printStackTrace();
 
 Review comment:
   This should not happen though but agree that we can throw exception here to 
catch any conversion issues.  Note that this is only used in the test code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-02-25 Thread GitBox
yihua commented on a change in pull request #1165: [HUDI-76] Add CSV Source 
support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r384313736
 
 

 ##
 File path: 
hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
 ##
 @@ -74,20 +74,30 @@
   public static final String[] DEFAULT_PARTITION_PATHS =
   {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH};
   public static final int DEFAULT_PARTITION_DEPTH = 3;
-  public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+  public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
   + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
   + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
   + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
-  + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
-  + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
-  + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", 
\"type\": \"string\"}]}},"
-  + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";
+  + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},";
+  public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": 
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+  public static final String FARE_NESTED_SCHEMA = "{\"name\": 
\"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+  + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", 
\"type\": \"string\"}]}},";
+  public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", 
\"type\": \"double\"},"
+  + "{\"name\": \"currency\", \"type\": \"string\"},";
+
+  public static final String TRIP_EXAMPLE_SCHEMA =
+  TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+  public static final String TRIP_FLATTENED_SCHEMA =
 
 Review comment:
   Yes, for CSV format, the nested schema is not well supported.  So to test 
CSV source, we need to generate the test CSV data with a flattened schema.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services