This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 635cd554e94e2c87858785f7e1227aa804eedc5e
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Tue Jun 25 19:40:48 2019 +0800

    [FLINK-12977][table] Port CsvTableSource to api-java-bridge
    
    This closes #8872
---
 .../apache/flink/table/sources/CsvTableSource.java | 431 +++++++++++++++++++++
 .../flink/table/api/StreamTableEnvironment.scala   |   2 +-
 .../runtime/batch/sql/TableSourceITCase.scala      |  22 +-
 .../runtime/stream/sql/TableSourceITCase.scala     |  24 +-
 .../apache/flink/table/util/testTableSources.scala |  45 +++
 .../flink/table/sources/CsvTableSource.scala       | 364 -----------------
 .../runtime/stream/sql/TableSourceITCase.scala     |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  25 +-
 8 files changed, 532 insertions(+), 383 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
new file mode 100644
index 0000000..160bc9a
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource
+       implements StreamTableSource<Row>, BatchTableSource<Row>, 
ProjectableTableSource<Row> {
+
+       private final CsvInputFormatConfig config;
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path       The path to the CSV file.
+        * @param fieldNames The names of the table fields.
+        * @param fieldTypes The types of the table fields.
+        */
+       public CsvTableSource(String path, String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+                       null, false, null, false);
+       }
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path            The path to the CSV file.
+        * @param fieldNames      The names of the table fields.
+        * @param fieldTypes      The types of the table fields.
+        * @param fieldDelim      The field delimiter, "," by default.
+        * @param lineDelim       The row delimiter, "\n" by default.
+        * @param quoteCharacter  An optional quote character for String 
values, null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments  An optional prefix to indicate comments, null 
by default.
+        * @param lenient         Flag to skip records with parse error instead 
to fail, false by
+        *                        default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+
+               this(path, fieldNames, fieldTypes,
+                       IntStream.range(0, fieldNames.length).toArray(),
+                       fieldDelim, lineDelim,
+                       quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+       }
+
+       /**
+        * A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+        * a (logically) unlimited number of fields.
+        *
+        * @param path            The path to the CSV file.
+        * @param fieldNames      The names of the table fields.
+        * @param fieldTypes      The types of the table fields.
+        * @param selectedFields  The fields which will be read and returned by 
the table source. If
+        *                        None, all fields are returned.
+        * @param fieldDelim      The field delimiter, "," by default.
+        * @param lineDelim       The row delimiter, "\n" by default.
+        * @param quoteCharacter  An optional quote character for String 
values, null by default.
+        * @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+        * @param ignoreComments  An optional prefix to indicate comments, null 
by default.
+        * @param lenient         Flag to skip records with parse error instead 
to fail, false by
+        *                        default.
+        */
+       public CsvTableSource(
+               String path,
+               String[] fieldNames,
+               TypeInformation<?>[] fieldTypes,
+               int[] selectedFields,
+               String fieldDelim,
+               String lineDelim,
+               Character quoteCharacter,
+               boolean ignoreFirstLine,
+               String ignoreComments,
+               boolean lenient) {
+               this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, 
selectedFields,
+                       fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, 
ignoreComments, lenient));
+       }
+
+       private CsvTableSource(CsvInputFormatConfig config) {
+               this.config = config;
+       }
+
+       /**
+        * Return a new builder that builds a CsvTableSource. For example:
+        * <pre>
+        * CsvTableSource source = new CsvTableSource.builder()
+        *     .path("/path/to/your/file.csv")
+        *     .field("myfield", Types.STRING)
+        *     .field("myfield2", Types.INT)
+        *     .build();
+        * </pre>
+        *
+        * @return a new builder to build a CsvTableSource
+        */
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       @Override
+       public TypeInformation<Row> getReturnType() {
+               return new RowTypeInfo(config.getSelectedFieldTypes(), 
config.getSelectedFieldNames());
+       }
+
+       @Override
+       public TableSchema getTableSchema() {
+               return new TableSchema(config.fieldNames, config.fieldTypes);
+       }
+
+       @Override
+       public CsvTableSource projectFields(int[] fields) {
+               if (fields.length == 0) {
+                       fields = new int[]{0};
+               }
+               return new CsvTableSource(config.select(fields));
+       }
+
+       @Override
+       public boolean isBounded() {
+               return true;
+       }
+
+       @Override
+       public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
+               return execEnv.createInput(config.createInputFormat(), 
getReturnType()).name(explainSource());
+       }
+
+       @Override
+       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
+               return execEnv.createInput(config.createInputFormat(), 
getReturnType()).name(explainSource());
+       }
+
+       @Override
+       public String explainSource() {
+               String[] fields = config.getSelectedFieldNames();
+               return "CsvTableSource(read fields: " + String.join(", ", 
fields) + ")";
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               CsvTableSource that = (CsvTableSource) o;
+               return Objects.equals(config, that.config);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(config);
+       }
+
+       /**
+        * A builder for creating CsvTableSource instances.
+        */
+       public static class Builder {
+               private LinkedHashMap<String, TypeInformation<?>> schema = new 
LinkedHashMap<>();
+               private Character quoteCharacter;
+               private String path;
+               private String fieldDelim = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+               private String lineDelim = 
CsvInputFormat.DEFAULT_LINE_DELIMITER;
+               private boolean isIgnoreFirstLine = false;
+               private String commentPrefix;
+               private boolean lenient = false;
+
+               /**
+                * Sets the path to the CSV file. Required.
+                *
+                * @param path the path to the CSV file
+                */
+               public Builder path(String path) {
+                       this.path = path;
+                       return this;
+               }
+
+               /**
+                * Sets the field delimiter, "," by default.
+                *
+                * @param delim the field delimiter
+                */
+               public Builder fieldDelimiter(String delim) {
+                       this.fieldDelim = delim;
+                       return this;
+               }
+
+               /**
+                * Sets the line delimiter, "\n" by default.
+                *
+                * @param delim the line delimiter
+                */
+               public Builder lineDelimiter(String delim) {
+                       this.lineDelim = delim;
+                       return this;
+               }
+
+               /**
+                * Adds a field with the field name and the type information. 
Required. This method can be
+                * called multiple times. The call order of this method defines 
also the order of the fields
+                * in a row.
+                *
+                * @param fieldName the field name
+                * @param fieldType the type information of the field
+                */
+               public Builder field(String fieldName, TypeInformation<?> 
fieldType) {
+                       if (schema.containsKey(fieldName)) {
+                               throw new IllegalArgumentException("Duplicate 
field name " + fieldName);
+                       }
+                       schema.put(fieldName, fieldType);
+                       return this;
+               }
+
+               /**
+                * Sets a quote character for String values, null by default.
+                *
+                * @param quote the quote character
+                */
+               public Builder quoteCharacter(Character quote) {
+                       this.quoteCharacter = quote;
+                       return this;
+               }
+
+               /**
+                * Sets a prefix to indicate comments, null by default.
+                *
+                * @param prefix the prefix to indicate comments
+                */
+               public Builder commentPrefix(String prefix) {
+                       this.commentPrefix = prefix;
+                       return this;
+               }
+
+               /**
+                * Ignore the first line. Not skip the first line by default.
+                */
+               public Builder ignoreFirstLine() {
+                       this.isIgnoreFirstLine = true;
+                       return this;
+               }
+
+               /**
+                * Skip records with parse error instead to fail. Throw an 
exception by default.
+                */
+               public Builder ignoreParseErrors() {
+                       this.lenient = true;
+                       return this;
+               }
+
+               /**
+                * Apply the current values and constructs a newly-created 
CsvTableSource.
+                *
+                * @return a newly-created CsvTableSource
+                */
+               public CsvTableSource build() {
+                       if (path == null) {
+                               throw new IllegalArgumentException("Path must 
be defined.");
+                       }
+                       if (schema.isEmpty()) {
+                               throw new IllegalArgumentException("Fields can 
not be empty.");
+                       }
+                       return new CsvTableSource(
+                               path,
+                               schema.keySet().toArray(new String[0]),
+                               schema.values().toArray(new 
TypeInformation<?>[0]),
+                               fieldDelim,
+                               lineDelim,
+                               quoteCharacter,
+                               isIgnoreFirstLine,
+                               commentPrefix,
+                               lenient);
+               }
+
+       }
+
+       private static class CsvInputFormatConfig implements Serializable {
+               private static final long serialVersionUID = 1L;
+
+               private final String path;
+               private final String[] fieldNames;
+               private final TypeInformation<?>[] fieldTypes;
+               private final int[] selectedFields;
+
+               private final String fieldDelim;
+               private final String lineDelim;
+               private final Character quoteCharacter;
+               private final boolean ignoreFirstLine;
+               private final String ignoreComments;
+               private final boolean lenient;
+
+               CsvInputFormatConfig(
+                       String path,
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes,
+                       int[] selectedFields,
+                       String fieldDelim,
+                       String lineDelim,
+                       Character quoteCharacter,
+                       boolean ignoreFirstLine,
+                       String ignoreComments,
+                       boolean lenient) {
+
+                       this.path = path;
+                       this.fieldNames = fieldNames;
+                       this.fieldTypes = fieldTypes;
+                       this.selectedFields = selectedFields;
+                       this.fieldDelim = fieldDelim;
+                       this.lineDelim = lineDelim;
+                       this.quoteCharacter = quoteCharacter;
+                       this.ignoreFirstLine = ignoreFirstLine;
+                       this.ignoreComments = ignoreComments;
+                       this.lenient = lenient;
+               }
+
+               String[] getSelectedFieldNames() {
+                       String[] selectedFieldNames = new 
String[selectedFields.length];
+                       for (int i = 0; i < selectedFields.length; i++) {
+                               selectedFieldNames[i] = 
fieldNames[selectedFields[i]];
+                       }
+                       return selectedFieldNames;
+               }
+
+               TypeInformation<?>[] getSelectedFieldTypes() {
+                       TypeInformation<?>[] selectedFieldTypes = new 
TypeInformation<?>[selectedFields.length];
+                       for (int i = 0; i < selectedFields.length; i++) {
+                               selectedFieldTypes[i] = 
fieldTypes[selectedFields[i]];
+                       }
+                       return selectedFieldTypes;
+               }
+
+               RowCsvInputFormat createInputFormat() {
+                       RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+                               new Path(path),
+                               getSelectedFieldTypes(),
+                               lineDelim,
+                               fieldDelim,
+                               selectedFields);
+                       inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine);
+                       inputFormat.setCommentPrefix(ignoreComments);
+                       inputFormat.setLenient(lenient);
+                       if (quoteCharacter != null) {
+                               
inputFormat.enableQuotedStringParsing(quoteCharacter);
+                       }
+                       return inputFormat;
+               }
+
+               CsvInputFormatConfig select(int[] fields) {
+                       return new CsvInputFormatConfig(path, fieldNames, 
fieldTypes, fields,
+                               fieldDelim, lineDelim, quoteCharacter, 
ignoreFirstLine, ignoreComments, lenient);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       CsvInputFormatConfig that = (CsvInputFormatConfig) o;
+                       return ignoreFirstLine == that.ignoreFirstLine &&
+                               lenient == that.lenient &&
+                               Objects.equals(path, that.path) &&
+                               Arrays.equals(fieldNames, that.fieldNames) &&
+                               Arrays.equals(fieldTypes, that.fieldTypes) &&
+                               Arrays.equals(selectedFields, 
that.selectedFields) &&
+                               Objects.equals(fieldDelim, that.fieldDelim) &&
+                               Objects.equals(lineDelim, that.lineDelim) &&
+                               Objects.equals(quoteCharacter, 
that.quoteCharacter) &&
+                               Objects.equals(ignoreComments, 
that.ignoreComments);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = Objects.hash(path, fieldDelim, lineDelim, 
quoteCharacter, ignoreFirstLine,
+                               ignoreComments, lenient);
+                       result = 31 * result + Arrays.hashCode(fieldNames);
+                       result = 31 * result + Arrays.hashCode(fieldTypes);
+                       result = 31 * result + Arrays.hashCode(selectedFields);
+                       return result;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 3a5edd3..e4bcc8c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -116,7 +116,7 @@ abstract class StreamTableEnvironment(
     // TODO TableSourceUtil.validateTableSource(tableSource)
     tableSource match {
       // check for proper stream table source
-      case streamTableSource: StreamTableSource[_] if 
!streamTableSource.isBounded => // ok
+      case _: StreamTableSource[_] => // StreamEnv can handle both bounded and 
unbounded ok
       // TODO `TableSourceUtil.hasRowtimeAttribute` depends on [Expression]
       // check that event-time is enabled if table source includes rowtime 
attributes
       // if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 39c59ed..4abf691 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.api.{DataTypes, 
TableConfigOptions, TableSchema, T
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData}
 import org.apache.flink.table.types.TypeInfoDataTypeConverter
-import org.apache.flink.table.util.{TestFilterableTableSource, 
TestNestedProjectableTableSource, TestProjectableTableSource}
+import org.apache.flink.table.util.{TestFilterableTableSource, 
TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
@@ -150,4 +149,23 @@ class TableSourceITCase extends BatchTestBase {
         row(8, "Record_8"))
     )
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("csvTable", csvTable)
+    checkResult(
+      "SELECT id, `first`, `last`, score FROM csvTable",
+      Seq(
+        row(1, "Mike", "Smith", 12.3),
+        row(2, "Bob", "Taylor", 45.6),
+        row(3, "Sam", "Miller", 7.89),
+        row(4, "Peter", "Smith", 0.12),
+        row(5, "Liz", "Williams", 34.5),
+        row(6, "Sally", "Miller", 6.78),
+        row(7, "Alice", "Smith", 90.1),
+        row(8, "Kelly", "Williams", 2.34)
+      )
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index b1a2fc4..9f4c2c3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -26,12 +26,13 @@ import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.runtime.utils.{StreamingTestBase, 
TestingAppendSink}
 import org.apache.flink.table.util._
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.Test
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
+import scala.collection.mutable
+
 class TableSourceITCase extends StreamingTestBase {
 
   @Test
@@ -314,4 +315,25 @@ class TableSourceITCase extends StreamingTestBase {
     val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+    val csvTable = TestTableSources.getPersonCsvTableSource
+    tEnv.registerTableSource("persons", csvTable)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery(
+      "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+      .toAppendStream[Row]
+      .addSink(sink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 4c407f8..776d2c4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
PreserveWatermarks}
 import org.apache.flink.types.Row
 
+import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.util
 import java.util.{Collections, List => JList}
 
@@ -40,6 +41,50 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+object TestTableSources {
+
+  def getPersonCsvTableSource: CsvTableSource = {
+    val csvRecords = Seq(
+      "First#Id#Score#Last",
+      "Mike#1#12.3#Smith",
+      "Bob#2#45.6#Taylor",
+      "Sam#3#7.89#Miller",
+      "Peter#4#0.12#Smith",
+      "% Just a comment",
+      "Liz#5#34.5#Williams",
+      "Sally#6#6.78#Miller",
+      "Alice#7#90.1#Smith",
+      "Kelly#8#2.34#Williams"
+    )
+
+    val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", 
"tmp")
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first", Types.STRING)
+      .field("id", Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
+  }
+
+  private def writeToTempFile(
+      contents: String,
+      filePrefix: String,
+      fileSuffix: String,
+      charset: String = "UTF-8"): String = {
+    val tempFile = File.createTempFile(filePrefix, fileSuffix)
+    tempFile.deleteOnExit()
+    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), 
charset)
+    tmpWriter.write(contents)
+    tmpWriter.close()
+    tempFile.getAbsolutePath
+  }
+}
+
 class TestTableSourceWithTime[T](
     isBatch: Boolean,
     tableSchema: TableSchema,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
deleted file mode 100644
index f7215ee..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableException, TableSchema}
-
-import scala.collection.mutable
-
-/**
-  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with 
a
-  * (logically) unlimited number of fields.
-  *
-  * @param path The path to the CSV file.
-  * @param fieldNames The names of the table fields.
-  * @param fieldTypes The types of the table fields.
-  * @param selectedFields The fields which will be read and returned by the 
table source.
-  *                       If None, all fields are returned.
-  * @param fieldDelim The field delimiter, "," by default.
-  * @param rowDelim The row delimiter, "\n" by default.
-  * @param quoteCharacter An optional quote character for String values, null 
by default.
-  * @param ignoreFirstLine Flag to ignore the first line, false by default.
-  * @param ignoreComments An optional prefix to indicate comments, null by 
default.
-  * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
-  */
-class CsvTableSource private (
-    private val path: String,
-    private val fieldNames: Array[String],
-    private val fieldTypes: Array[TypeInformation[_]],
-    private val selectedFields: Array[Int],
-    private val fieldDelim: String,
-    private val rowDelim: String,
-    private val quoteCharacter: Character,
-    private val ignoreFirstLine: Boolean,
-    private val ignoreComments: String,
-    private val lenient: Boolean)
-  extends BatchTableSource[Row]
-  with StreamTableSource[Row]
-  with ProjectableTableSource[Row] {
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files 
with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    * @param fieldDelim The field delimiter, "," by default.
-    * @param rowDelim The row delimiter, "\n" by default.
-    * @param quoteCharacter An optional quote character for String values, 
null by default.
-    * @param ignoreFirstLine Flag to ignore the first line, false by default.
-    * @param ignoreComments An optional prefix to indicate comments, null by 
default.
-    * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
-    */
-  def this(
-    path: String,
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    quoteCharacter: Character = null,
-    ignoreFirstLine: Boolean = false,
-    ignoreComments: String = null,
-    lenient: Boolean = false) = {
-
-    this(
-      path,
-      fieldNames,
-      fieldTypes,
-      fieldTypes.indices.toArray, // initially, all fields are returned
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-
-  }
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files 
with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    */
-  def this(path: String, fieldNames: Array[String], fieldTypes: 
Array[TypeInformation[_]]) = {
-    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
-  }
-
-  if (fieldNames.length != fieldTypes.length) {
-    throw new TableException("Number of field names and field types must be 
equal.")
-  }
-
-  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
-  private val selectedFieldNames = selectedFields.map(fieldNames(_))
-
-  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, 
selectedFieldNames)
-
-  /**
-    * Returns the data of the table as a [[DataSet]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-    execEnv.createInput(createCsvInput(), returnType).name(explainSource())
-  }
-
-  /** Returns the [[RowTypeInfo]] for the return type of the 
[[CsvTableSource]]. */
-  override def getReturnType: RowTypeInfo = returnType
-
-  /**
-    * Returns the data of the table as a [[DataStream]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a 
[[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-    streamExecEnv.createInput(createCsvInput(), 
returnType).name(explainSource())
-  }
-
-  /** Returns the schema of the produced table. */
-  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
-
-  /** Returns a copy of [[TableSource]] with ability to project fields */
-  override def projectFields(fields: Array[Int]): CsvTableSource = {
-
-    val selectedFields = if (fields.isEmpty) Array(0) else fields
-
-    new CsvTableSource(
-      path,
-      fieldNames,
-      fieldTypes,
-      selectedFields,
-      fieldDelim,
-      rowDelim,
-      quoteCharacter,
-      ignoreFirstLine,
-      ignoreComments,
-      lenient)
-  }
-
-  private def createCsvInput(): RowCsvInputFormat = {
-    val inputFormat = new RowCsvInputFormat(
-      new Path(path),
-      selectedFieldTypes,
-      rowDelim,
-      fieldDelim,
-      selectedFields)
-
-    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
-    inputFormat.setLenient(lenient)
-    if (quoteCharacter != null) {
-      inputFormat.enableQuotedStringParsing(quoteCharacter)
-    }
-    if (ignoreComments != null) {
-      inputFormat.setCommentPrefix(ignoreComments)
-    }
-
-    inputFormat
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: CsvTableSource => returnType == that.returnType &&
-        path == that.path &&
-        fieldDelim == that.fieldDelim &&
-        rowDelim == that.rowDelim &&
-        quoteCharacter == that.quoteCharacter &&
-        ignoreFirstLine == that.ignoreFirstLine &&
-        ignoreComments == that.ignoreComments &&
-        lenient == that.lenient
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    returnType.hashCode()
-  }
-
-  override def explainSource(): String = {
-    s"CsvTableSource(" +
-      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
-  }
-}
-
-object CsvTableSource {
-
-  /**
-    * A builder for creating [[CsvTableSource]] instances.
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = new CsvTableSource.builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    *
-    */
-  class Builder {
-
-    private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] =
-      mutable.LinkedHashMap[String, TypeInformation[_]]()
-    private var quoteCharacter: Character = _
-    private var path: String = _
-    private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
-    private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
-    private var isIgnoreFirstLine: Boolean = false
-    private var commentPrefix: String = _
-    private var lenient: Boolean = false
-
-    /**
-      * Sets the path to the CSV file. Required.
-      *
-      * @param path the path to the CSV file
-      */
-    def path(path: String): Builder = {
-      this.path = path
-      this
-    }
-
-    /**
-      * Sets the field delimiter, "," by default.
-      *
-      * @param delim the field delimiter
-      */
-    def fieldDelimiter(delim: String): Builder = {
-      this.fieldDelim = delim
-      this
-    }
-
-    /**
-      * Sets the line delimiter, "\n" by default.
-      *
-      * @param delim the line delimiter
-      */
-    def lineDelimiter(delim: String): Builder = {
-      this.lineDelim = delim
-      this
-    }
-
-    /**
-      * Adds a field with the field name and the type information. Required.
-      * This method can be called multiple times. The call order of this 
method defines
-      * also the order of the fields in a row.
-      *
-      * @param fieldName the field name
-      * @param fieldType the type information of the field
-      */
-    def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
-      if (schema.contains(fieldName)) {
-        throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
-      }
-      schema += (fieldName -> fieldType)
-      this
-    }
-
-    /**
-      * Sets a quote character for String values, null by default.
-      *
-      * @param quote the quote character
-      */
-    def quoteCharacter(quote: Character): Builder = {
-      this.quoteCharacter = quote
-      this
-    }
-
-    /**
-      * Sets a prefix to indicate comments, null by default.
-      *
-      * @param prefix the prefix to indicate comments
-      */
-    def commentPrefix(prefix: String): Builder = {
-      this.commentPrefix = prefix
-      this
-    }
-
-    /**
-      * Ignore the first line. Not skip the first line by default.
-      */
-    def ignoreFirstLine(): Builder = {
-      this.isIgnoreFirstLine = true
-      this
-    }
-
-    /**
-      * Skip records with parse error instead to fail. Throw an exception by 
default.
-      */
-    def ignoreParseErrors(): Builder = {
-      this.lenient = true
-      this
-    }
-
-    /**
-      * Apply the current values and constructs a newly-created 
[[CsvTableSource]].
-      *
-      * @return a newly-created [[CsvTableSource]].
-      */
-    def build(): CsvTableSource = {
-      if (path == null) {
-        throw new IllegalArgumentException("Path must be defined.")
-      }
-      if (schema.isEmpty) {
-        throw new IllegalArgumentException("Fields can not be empty.")
-      }
-      new CsvTableSource(
-        path,
-        schema.keys.toArray,
-        schema.values.toArray,
-        fieldDelim,
-        lineDelim,
-        quoteCharacter,
-        isIgnoreFirstLine,
-        commentPrefix,
-        lenient)
-    }
-
-  }
-
-  /**
-    * Return a new builder that builds a [[CsvTableSource]].
-    *
-    * For example:
-    *
-    * {{{
-    *   val source: CsvTableSource = CsvTableSource
-    *     .builder()
-    *     .path("/path/to/your/file.csv")
-    *     .field("myfield", Types.STRING)
-    *     .field("myfield2", Types.INT)
-    *     .build()
-    * }}}
-    * @return a new builder to build a [[CsvTableSource]]
-    */
-  def builder(): Builder = new Builder
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index fdf834c..ba55e53 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
+import _root_.scala.collection.mutable
 
 class TableSourceITCase extends AbstractTestBase {
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index aff208c..0d633fe 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -46,20 +46,17 @@ object CommonTestData {
     )
 
     val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", 
"tmp")
-    new CsvTableSource(
-      tempFilePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        Types.STRING,
-        Types.INT,
-        Types.DOUBLE,
-        Types.STRING
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
+    CsvTableSource.builder()
+      .path(tempFilePath)
+      .field("first",Types.STRING)
+      .field("id",Types.INT)
+      .field("score",Types.DOUBLE)
+      .field("last",Types.STRING)
+      .fieldDelimiter("#")
+      .lineDelimiter("$")
+      .ignoreFirstLine()
+      .commentPrefix("%")
+      .build()
   }
 
   def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {

Reply via email to