Hi Cliff,
you are right.
The CsvTableSink and the CsvInputFormat are not in sync. It would be great
if you could open a JIRA ticket for this issue.
As a workaround, you could implement your own CsvTableSink to add a
delimiter after the last field.
The code is straightforward, less than 150 lines simple Scala code [1] (see
CsvFormatter at the end of the file).
Best, Fabian
[1]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
2017-12-22 21:34 GMT+01:00 Cliff Resnick :
> I've been trying out the Table API for some ETL using a two-stage job of
> CsvTableSink (DataSet) -> CsvInputFormat (Stream). I ran into an issue
> where the first stage produces output with trailing null values (valid),
> which causes a parse error in the second stage.
>
> Looking at RowCsvInputFormatTest.java, I noticed that it expects input
> lines with a trailing delimiter, eg. "a|b|c|". Meanwhile, the CsvTableSink
> creates rows in the form of "a|b|c". As long as 'c' is present, this input
> does get successfully parsed by the RowCsvInputFormat. However, if 'c' is
> defined as a number and missing, eg. the row is "a|b|", the Number parser
> will fail on the empty string.
>
> Is there something I am missing, or is there, in fact, an inconsistency
> between the TableSink and the InputFormat?
>