This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 635cd554e94e2c87858785f7e1227aa804eedc5e Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jun 25 19:40:48 2019 +0800 [FLINK-12977][table] Port CsvTableSource to api-java-bridge This closes #8872 --- .../apache/flink/table/sources/CsvTableSource.java | 431 +++++++++++++++++++++ .../flink/table/api/StreamTableEnvironment.scala | 2 +- .../runtime/batch/sql/TableSourceITCase.scala | 22 +- .../runtime/stream/sql/TableSourceITCase.scala | 24 +- .../apache/flink/table/util/testTableSources.scala | 45 +++ .../flink/table/sources/CsvTableSource.scala | 364 ----------------- .../runtime/stream/sql/TableSourceITCase.scala | 2 +- .../flink/table/runtime/utils/CommonTestData.scala | 25 +- 8 files changed, 532 insertions(+), 383 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java new file mode 100644 index 0000000..160bc9a --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * A {@link StreamTableSource} and {@link BatchTableSource} for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource + implements StreamTableSource<Row>, BatchTableSource<Row>, ProjectableTableSource<Row> { + + private final CsvInputFormatConfig config; + + /** + * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with + * a (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + */ + public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** + * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with + * a (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + * @param fieldDelim The field delimiter, "," by default. + * @param lineDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. + * @param ignoreFirstLine Flag to ignore the first line, false by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. + * @param lenient Flag to skip records with parse error instead to fail, false by + * default. + */ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** + * A {@link InputFormatTableSource} and {@link LookupableTableSource} for simple CSV files with + * a (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. If + * None, all fields are returned. + * @param fieldDelim The field delimiter, "," by default. + * @param lineDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. + * @param ignoreFirstLine Flag to ignore the first line, false by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. + * @param lenient Flag to skip records with parse error instead to fail, false by + * default. + */ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + int[] selectedFields, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, selectedFields, + fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient)); + } + + private CsvTableSource(CsvInputFormatConfig config) { + this.config = config; + } + + /** + * Return a new builder that builds a CsvTableSource. For example: + * <pre> + * CsvTableSource source = new CsvTableSource.builder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build(); + * </pre> + * + * @return a new builder to build a CsvTableSource + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public TypeInformation<Row> getReturnType() { + return new RowTypeInfo(config.getSelectedFieldTypes(), config.getSelectedFieldNames()); + } + + @Override + public TableSchema getTableSchema() { + return new TableSchema(config.fieldNames, config.fieldTypes); + } + + @Override + public CsvTableSource projectFields(int[] fields) { + if (fields.length == 0) { + fields = new int[]{0}; + } + return new CsvTableSource(config.select(fields)); + } + + @Override + public boolean isBounded() { + return true; + } + + @Override + public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { + return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource()); + } + + @Override + public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { + return execEnv.createInput(config.createInputFormat(), getReturnType()).name(explainSource()); + } + + @Override + public String explainSource() { + String[] fields = config.getSelectedFieldNames(); + return "CsvTableSource(read fields: " + String.join(", ", fields) + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CsvTableSource that = (CsvTableSource) o; + return Objects.equals(config, that.config); + } + + @Override + public int hashCode() { + return Objects.hash(config); + } + + /** + * A builder for creating CsvTableSource instances. + */ + public static class Builder { + private LinkedHashMap<String, TypeInformation<?>> schema = new LinkedHashMap<>(); + private Character quoteCharacter; + private String path; + private String fieldDelim = CsvInputFormat.DEFAULT_FIELD_DELIMITER; + private String lineDelim = CsvInputFormat.DEFAULT_LINE_DELIMITER; + private boolean isIgnoreFirstLine = false; + private String commentPrefix; + private boolean lenient = false; + + /** + * Sets the path to the CSV file. Required. + * + * @param path the path to the CSV file + */ + public Builder path(String path) { + this.path = path; + return this; + } + + /** + * Sets the field delimiter, "," by default. + * + * @param delim the field delimiter + */ + public Builder fieldDelimiter(String delim) { + this.fieldDelim = delim; + return this; + } + + /** + * Sets the line delimiter, "\n" by default. + * + * @param delim the line delimiter + */ + public Builder lineDelimiter(String delim) { + this.lineDelim = delim; + return this; + } + + /** + * Adds a field with the field name and the type information. Required. This method can be + * called multiple times. The call order of this method defines also the order of the fields + * in a row. + * + * @param fieldName the field name + * @param fieldType the type information of the field + */ + public Builder field(String fieldName, TypeInformation<?> fieldType) { + if (schema.containsKey(fieldName)) { + throw new IllegalArgumentException("Duplicate field name " + fieldName); + } + schema.put(fieldName, fieldType); + return this; + } + + /** + * Sets a quote character for String values, null by default. + * + * @param quote the quote character + */ + public Builder quoteCharacter(Character quote) { + this.quoteCharacter = quote; + return this; + } + + /** + * Sets a prefix to indicate comments, null by default. + * + * @param prefix the prefix to indicate comments + */ + public Builder commentPrefix(String prefix) { + this.commentPrefix = prefix; + return this; + } + + /** + * Ignore the first line. Not skip the first line by default. + */ + public Builder ignoreFirstLine() { + this.isIgnoreFirstLine = true; + return this; + } + + /** + * Skip records with parse error instead to fail. Throw an exception by default. + */ + public Builder ignoreParseErrors() { + this.lenient = true; + return this; + } + + /** + * Apply the current values and constructs a newly-created CsvTableSource. + * + * @return a newly-created CsvTableSource + */ + public CsvTableSource build() { + if (path == null) { + throw new IllegalArgumentException("Path must be defined."); + } + if (schema.isEmpty()) { + throw new IllegalArgumentException("Fields can not be empty."); + } + return new CsvTableSource( + path, + schema.keySet().toArray(new String[0]), + schema.values().toArray(new TypeInformation<?>[0]), + fieldDelim, + lineDelim, + quoteCharacter, + isIgnoreFirstLine, + commentPrefix, + lenient); + } + + } + + private static class CsvInputFormatConfig implements Serializable { + private static final long serialVersionUID = 1L; + + private final String path; + private final String[] fieldNames; + private final TypeInformation<?>[] fieldTypes; + private final int[] selectedFields; + + private final String fieldDelim; + private final String lineDelim; + private final Character quoteCharacter; + private final boolean ignoreFirstLine; + private final String ignoreComments; + private final boolean lenient; + + CsvInputFormatConfig( + String path, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + int[] selectedFields, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this.path = path; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + this.selectedFields = selectedFields; + this.fieldDelim = fieldDelim; + this.lineDelim = lineDelim; + this.quoteCharacter = quoteCharacter; + this.ignoreFirstLine = ignoreFirstLine; + this.ignoreComments = ignoreComments; + this.lenient = lenient; + } + + String[] getSelectedFieldNames() { + String[] selectedFieldNames = new String[selectedFields.length]; + for (int i = 0; i < selectedFields.length; i++) { + selectedFieldNames[i] = fieldNames[selectedFields[i]]; + } + return selectedFieldNames; + } + + TypeInformation<?>[] getSelectedFieldTypes() { + TypeInformation<?>[] selectedFieldTypes = new TypeInformation<?>[selectedFields.length]; + for (int i = 0; i < selectedFields.length; i++) { + selectedFieldTypes[i] = fieldTypes[selectedFields[i]]; + } + return selectedFieldTypes; + } + + RowCsvInputFormat createInputFormat() { + RowCsvInputFormat inputFormat = new RowCsvInputFormat( + new Path(path), + getSelectedFieldTypes(), + lineDelim, + fieldDelim, + selectedFields); + inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine); + inputFormat.setCommentPrefix(ignoreComments); + inputFormat.setLenient(lenient); + if (quoteCharacter != null) { + inputFormat.enableQuotedStringParsing(quoteCharacter); + } + return inputFormat; + } + + CsvInputFormatConfig select(int[] fields) { + return new CsvInputFormatConfig(path, fieldNames, fieldTypes, fields, + fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CsvInputFormatConfig that = (CsvInputFormatConfig) o; + return ignoreFirstLine == that.ignoreFirstLine && + lenient == that.lenient && + Objects.equals(path, that.path) && + Arrays.equals(fieldNames, that.fieldNames) && + Arrays.equals(fieldTypes, that.fieldTypes) && + Arrays.equals(selectedFields, that.selectedFields) && + Objects.equals(fieldDelim, that.fieldDelim) && + Objects.equals(lineDelim, that.lineDelim) && + Objects.equals(quoteCharacter, that.quoteCharacter) && + Objects.equals(ignoreComments, that.ignoreComments); + } + + @Override + public int hashCode() { + int result = Objects.hash(path, fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, + ignoreComments, lenient); + result = 31 * result + Arrays.hashCode(fieldNames); + result = 31 * result + Arrays.hashCode(fieldTypes); + result = 31 * result + Arrays.hashCode(selectedFields); + return result; + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 3a5edd3..e4bcc8c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -116,7 +116,7 @@ abstract class StreamTableEnvironment( // TODO TableSourceUtil.validateTableSource(tableSource) tableSource match { // check for proper stream table source - case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => // ok + case _: StreamTableSource[_] => // StreamEnv can handle both bounded and unbounded ok // TODO `TableSourceUtil.hasRowtimeAttribute` depends on [Expression] // check that event-time is enabled if table source includes rowtime attributes // if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) && diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala index 39c59ed..4abf691 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala @@ -24,9 +24,8 @@ import org.apache.flink.table.api.{DataTypes, TableConfigOptions, TableSchema, T import org.apache.flink.table.runtime.utils.BatchTestBase.row import org.apache.flink.table.runtime.utils.{BatchTestBase, TestData} import org.apache.flink.table.types.TypeInfoDataTypeConverter -import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource} +import org.apache.flink.table.util.{TestFilterableTableSource, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSources} import org.apache.flink.types.Row - import org.junit.{Before, Test} import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} @@ -150,4 +149,23 @@ class TableSourceITCase extends BatchTestBase { row(8, "Record_8")) ) } + + @Test + def testCsvTableSource(): Unit = { + val csvTable = TestTableSources.getPersonCsvTableSource + tEnv.registerTableSource("csvTable", csvTable) + checkResult( + "SELECT id, `first`, `last`, score FROM csvTable", + Seq( + row(1, "Mike", "Smith", 12.3), + row(2, "Bob", "Taylor", 45.6), + row(3, "Sam", "Miller", 7.89), + row(4, "Peter", "Smith", 0.12), + row(5, "Liz", "Williams", 34.5), + row(6, "Sally", "Miller", 6.78), + row(7, "Alice", "Smith", 90.1), + row(8, "Kelly", "Williams", 2.34) + ) + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala index b1a2fc4..9f4c2c3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala @@ -26,12 +26,13 @@ import org.apache.flink.table.api.{TableSchema, Types} import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendSink} import org.apache.flink.table.util._ import org.apache.flink.types.Row - import org.junit.Assert._ import org.junit.Test import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} +import scala.collection.mutable + class TableSourceITCase extends StreamingTestBase { @Test @@ -314,4 +315,25 @@ class TableSourceITCase extends StreamingTestBase { val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testCsvTableSource(): Unit = { + val csvTable = TestTableSources.getPersonCsvTableSource + tEnv.registerTableSource("persons", csvTable) + + val sink = new TestingAppendSink() + tEnv.sqlQuery( + "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ") + .toAppendStream[Row] + .addSink(sink) + + env.execute() + + val expected = mutable.MutableList( + "1,Mike,Smith,12.3", + "2,Bob,Taylor,45.6", + "3,Sam,Miller,7.89") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala index 4c407f8..776d2c4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks} import org.apache.flink.types.Row +import java.io.{File, FileOutputStream, OutputStreamWriter} import java.util import java.util.{Collections, List => JList} @@ -40,6 +41,50 @@ import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable +object TestTableSources { + + def getPersonCsvTableSource: CsvTableSource = { + val csvRecords = Seq( + "First#Id#Score#Last", + "Mike#1#12.3#Smith", + "Bob#2#45.6#Taylor", + "Sam#3#7.89#Miller", + "Peter#4#0.12#Smith", + "% Just a comment", + "Liz#5#34.5#Williams", + "Sally#6#6.78#Miller", + "Alice#7#90.1#Smith", + "Kelly#8#2.34#Williams" + ) + + val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp") + CsvTableSource.builder() + .path(tempFilePath) + .field("first", Types.STRING) + .field("id", Types.INT) + .field("score",Types.DOUBLE) + .field("last",Types.STRING) + .fieldDelimiter("#") + .lineDelimiter("$") + .ignoreFirstLine() + .commentPrefix("%") + .build() + } + + private def writeToTempFile( + contents: String, + filePrefix: String, + fileSuffix: String, + charset: String = "UTF-8"): String = { + val tempFile = File.createTempFile(filePrefix, fileSuffix) + tempFile.deleteOnExit() + val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset) + tmpWriter.write(contents) + tmpWriter.close() + tempFile.getAbsolutePath + } +} + class TestTableSourceWithTime[T]( isBatch: Boolean, tableSchema: TableSchema, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala deleted file mode 100644 index f7215ee..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.io.CsvInputFormat -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.types.Row -import org.apache.flink.api.java.io.RowCsvInputFormat -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.core.fs.Path -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.{TableException, TableSchema} - -import scala.collection.mutable - -/** - * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a - * (logically) unlimited number of fields. - * - * @param path The path to the CSV file. - * @param fieldNames The names of the table fields. - * @param fieldTypes The types of the table fields. - * @param selectedFields The fields which will be read and returned by the table source. - * If None, all fields are returned. - * @param fieldDelim The field delimiter, "," by default. - * @param rowDelim The row delimiter, "\n" by default. - * @param quoteCharacter An optional quote character for String values, null by default. - * @param ignoreFirstLine Flag to ignore the first line, false by default. - * @param ignoreComments An optional prefix to indicate comments, null by default. - * @param lenient Flag to skip records with parse error instead to fail, false by default. - */ -class CsvTableSource private ( - private val path: String, - private val fieldNames: Array[String], - private val fieldTypes: Array[TypeInformation[_]], - private val selectedFields: Array[Int], - private val fieldDelim: String, - private val rowDelim: String, - private val quoteCharacter: Character, - private val ignoreFirstLine: Boolean, - private val ignoreComments: String, - private val lenient: Boolean) - extends BatchTableSource[Row] - with StreamTableSource[Row] - with ProjectableTableSource[Row] { - - /** - * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a - * (logically) unlimited number of fields. - * - * @param path The path to the CSV file. - * @param fieldNames The names of the table fields. - * @param fieldTypes The types of the table fields. - * @param fieldDelim The field delimiter, "," by default. - * @param rowDelim The row delimiter, "\n" by default. - * @param quoteCharacter An optional quote character for String values, null by default. - * @param ignoreFirstLine Flag to ignore the first line, false by default. - * @param ignoreComments An optional prefix to indicate comments, null by default. - * @param lenient Flag to skip records with parse error instead to fail, false by default. - */ - def this( - path: String, - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]], - fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, - rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, - quoteCharacter: Character = null, - ignoreFirstLine: Boolean = false, - ignoreComments: String = null, - lenient: Boolean = false) = { - - this( - path, - fieldNames, - fieldTypes, - fieldTypes.indices.toArray, // initially, all fields are returned - fieldDelim, - rowDelim, - quoteCharacter, - ignoreFirstLine, - ignoreComments, - lenient) - - } - - /** - * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a - * (logically) unlimited number of fields. - * - * @param path The path to the CSV file. - * @param fieldNames The names of the table fields. - * @param fieldTypes The types of the table fields. - */ - def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = { - this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, - CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) - } - - if (fieldNames.length != fieldTypes.length) { - throw new TableException("Number of field names and field types must be equal.") - } - - private val selectedFieldTypes = selectedFields.map(fieldTypes(_)) - private val selectedFieldNames = selectedFields.map(fieldNames(_)) - - private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames) - - /** - * Returns the data of the table as a [[DataSet]] of [[Row]]. - * - * NOTE: This method is for internal use only for defining a [[TableSource]]. - * Do not use it in Table API programs. - */ - override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { - execEnv.createInput(createCsvInput(), returnType).name(explainSource()) - } - - /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ - override def getReturnType: RowTypeInfo = returnType - - /** - * Returns the data of the table as a [[DataStream]] of [[Row]]. - * - * NOTE: This method is for internal use only for defining a [[TableSource]]. - * Do not use it in Table API programs. - */ - override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { - streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource()) - } - - /** Returns the schema of the produced table. */ - override def getTableSchema = new TableSchema(fieldNames, fieldTypes) - - /** Returns a copy of [[TableSource]] with ability to project fields */ - override def projectFields(fields: Array[Int]): CsvTableSource = { - - val selectedFields = if (fields.isEmpty) Array(0) else fields - - new CsvTableSource( - path, - fieldNames, - fieldTypes, - selectedFields, - fieldDelim, - rowDelim, - quoteCharacter, - ignoreFirstLine, - ignoreComments, - lenient) - } - - private def createCsvInput(): RowCsvInputFormat = { - val inputFormat = new RowCsvInputFormat( - new Path(path), - selectedFieldTypes, - rowDelim, - fieldDelim, - selectedFields) - - inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) - inputFormat.setLenient(lenient) - if (quoteCharacter != null) { - inputFormat.enableQuotedStringParsing(quoteCharacter) - } - if (ignoreComments != null) { - inputFormat.setCommentPrefix(ignoreComments) - } - - inputFormat - } - - override def equals(other: Any): Boolean = other match { - case that: CsvTableSource => returnType == that.returnType && - path == that.path && - fieldDelim == that.fieldDelim && - rowDelim == that.rowDelim && - quoteCharacter == that.quoteCharacter && - ignoreFirstLine == that.ignoreFirstLine && - ignoreComments == that.ignoreComments && - lenient == that.lenient - case _ => false - } - - override def hashCode(): Int = { - returnType.hashCode() - } - - override def explainSource(): String = { - s"CsvTableSource(" + - s"read fields: ${getReturnType.getFieldNames.mkString(", ")})" - } -} - -object CsvTableSource { - - /** - * A builder for creating [[CsvTableSource]] instances. - * - * For example: - * - * {{{ - * val source: CsvTableSource = new CsvTableSource.builder() - * .path("/path/to/your/file.csv") - * .field("myfield", Types.STRING) - * .field("myfield2", Types.INT) - * .build() - * }}} - * - */ - class Builder { - - private val schema: mutable.LinkedHashMap[String, TypeInformation[_]] = - mutable.LinkedHashMap[String, TypeInformation[_]]() - private var quoteCharacter: Character = _ - private var path: String = _ - private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER - private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER - private var isIgnoreFirstLine: Boolean = false - private var commentPrefix: String = _ - private var lenient: Boolean = false - - /** - * Sets the path to the CSV file. Required. - * - * @param path the path to the CSV file - */ - def path(path: String): Builder = { - this.path = path - this - } - - /** - * Sets the field delimiter, "," by default. - * - * @param delim the field delimiter - */ - def fieldDelimiter(delim: String): Builder = { - this.fieldDelim = delim - this - } - - /** - * Sets the line delimiter, "\n" by default. - * - * @param delim the line delimiter - */ - def lineDelimiter(delim: String): Builder = { - this.lineDelim = delim - this - } - - /** - * Adds a field with the field name and the type information. Required. - * This method can be called multiple times. The call order of this method defines - * also the order of the fields in a row. - * - * @param fieldName the field name - * @param fieldType the type information of the field - */ - def field(fieldName: String, fieldType: TypeInformation[_]): Builder = { - if (schema.contains(fieldName)) { - throw new IllegalArgumentException(s"Duplicate field name $fieldName.") - } - schema += (fieldName -> fieldType) - this - } - - /** - * Sets a quote character for String values, null by default. - * - * @param quote the quote character - */ - def quoteCharacter(quote: Character): Builder = { - this.quoteCharacter = quote - this - } - - /** - * Sets a prefix to indicate comments, null by default. - * - * @param prefix the prefix to indicate comments - */ - def commentPrefix(prefix: String): Builder = { - this.commentPrefix = prefix - this - } - - /** - * Ignore the first line. Not skip the first line by default. - */ - def ignoreFirstLine(): Builder = { - this.isIgnoreFirstLine = true - this - } - - /** - * Skip records with parse error instead to fail. Throw an exception by default. - */ - def ignoreParseErrors(): Builder = { - this.lenient = true - this - } - - /** - * Apply the current values and constructs a newly-created [[CsvTableSource]]. - * - * @return a newly-created [[CsvTableSource]]. - */ - def build(): CsvTableSource = { - if (path == null) { - throw new IllegalArgumentException("Path must be defined.") - } - if (schema.isEmpty) { - throw new IllegalArgumentException("Fields can not be empty.") - } - new CsvTableSource( - path, - schema.keys.toArray, - schema.values.toArray, - fieldDelim, - lineDelim, - quoteCharacter, - isIgnoreFirstLine, - commentPrefix, - lenient) - } - - } - - /** - * Return a new builder that builds a [[CsvTableSource]]. - * - * For example: - * - * {{{ - * val source: CsvTableSource = CsvTableSource - * .builder() - * .path("/path/to/your/file.csv") - * .field("myfield", Types.STRING) - * .field("myfield2", Types.INT) - * .build() - * }}} - * @return a new builder to build a [[CsvTableSource]] - */ - def builder(): Builder = new Builder -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala index fdf834c..ba55e53 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test -import scala.collection.mutable +import _root_.scala.collection.mutable class TableSourceITCase extends AbstractTestBase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index aff208c..0d633fe 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -46,20 +46,17 @@ object CommonTestData { ) val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp") - new CsvTableSource( - tempFilePath, - Array("first", "id", "score", "last"), - Array( - Types.STRING, - Types.INT, - Types.DOUBLE, - Types.STRING - ), - fieldDelim = "#", - rowDelim = "$", - ignoreFirstLine = true, - ignoreComments = "%" - ) + CsvTableSource.builder() + .path(tempFilePath) + .field("first",Types.STRING) + .field("id",Types.INT) + .field("score",Types.DOUBLE) + .field("last",Types.STRING) + .fieldDelimiter("#") + .lineDelimiter("$") + .ignoreFirstLine() + .commentPrefix("%") + .build() } def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = {