Hi Cliff, you are right. The CsvTableSink and the CsvInputFormat are not in sync. It would be great if you could open a JIRA ticket for this issue. As a workaround, you could implement your own CsvTableSink to add a delimiter after the last field. The code is straightforward, less than 150 lines simple Scala code [1] (see CsvFormatter at the end of the file).
Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala 2017-12-22 21:34 GMT+01:00 Cliff Resnick <[email protected]>: > I've been trying out the Table API for some ETL using a two-stage job of > CsvTableSink (DataSet) -> CsvInputFormat (Stream). I ran into an issue > where the first stage produces output with trailing null values (valid), > which causes a parse error in the second stage. > > Looking at RowCsvInputFormatTest.java, I noticed that it expects input > lines with a trailing delimiter, eg. "a|b|c|". Meanwhile, the CsvTableSink > creates rows in the form of "a|b|c". As long as 'c' is present, this input > does get successfully parsed by the RowCsvInputFormat. However, if 'c' is > defined as a number and missing, eg. the row is "a|b|", the Number parser > will fail on the empty string. > > Is there something I am missing, or is there, in fact, an inconsistency > between the TableSink and the InputFormat? >
