I think this might be a bug in `tableEnv.fromValues`.

Could you try to remove the DataType parameter, and let the framework
derive the types?

final Table inputTable = tableEnv.fromValues(
        Row.of(1L, "Hello"), //
        Row.of(2L, "Hello"), //
        Row.of(3L, ""), //
        Row.of(4L, "Ciao"));

Best,
Jark


On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Flavio
>
> I reproduced your issue, and I think it should be a bug. But I’m not sure
> it comes from Calcite or Flink shaded Calcite, Flink Table Planner module
> shaded calcite.
>
> Maybe Danny can help explain more.
>
> CC: Danny
>
> Best
> Leonard Xu
>
> 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it> 写道:
>
> If I use
>
> final Table inputTable = tableEnv.fromValues(
>         DataTypes.ROW(
>             DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>             DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>         ), ..
>   tableEnv.executeSql(//
>         "CREATE TABLE `out` (" +
>             "col1 STRING," +
>             "col2 STRING" +
>             ") WITH (...)
>
> the job works as expected but this is wrong IMHO
> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should
> be "STRING NOT NULL" . Am I correct?
>
> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Sorry, obviously  " 'format' = 'parquet'" + is without comment :D
>>
>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Hi to all,
>>> I'm trying to test write to parquet using the following code but I have
>>> an error:
>>>
>>>  final TableEnvironment tableEnv =
>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>>     final Table inputTable = tableEnv.fromValues(//
>>>         DataTypes.ROW(//
>>>             DataTypes.FIELD("col1", DataTypes.STRING()), //
>>>             DataTypes.FIELD("col2", DataTypes.STRING())//
>>>         ), //
>>>         Row.of(1L, "Hello"), //
>>>         Row.of(2L, "Hello"), //
>>>         Row.of(3L, ""), //
>>>         Row.of(4L, "Ciao"));
>>>     tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>>     tableEnv.executeSql(//
>>>         "CREATE TABLE `out` (\n" + //
>>>             "col1 STRING,\n" + //
>>>             "col2 STRING\n" + //
>>>             ") WITH (\n" + //
>>>             " 'connector' = 'filesystem',\n" + //
>>>             // " 'format' = 'parquet',\n" + //
>>>             " 'update-mode' = 'append',\n" + //
>>>             " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>>             " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>>             ")");
>>>
>>>     tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>> ParquetDataset");
>>>
>>> ---------------------------------
>>>
>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>> relational algebra failed to preserve datatypes:
>>> validated type:
>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>> converted type:
>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>> rel:
>>> LogicalProject(col1=[$0], col2=[$1])
>>>   LogicalUnion(all=[true])
>>>     LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>>       LogicalValues(tuples=[[{ 0 }]])
>>>     LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>>       LogicalValues(tuples=[[{ 0 }]])
>>>     LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>>       LogicalValues(tuples=[[{ 0 }]])
>>>     LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>>       LogicalValues(tuples=[[{ 0 }]])
>>>
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>
>>>
>>> What is wrong with my code?
>>>
>>
>>
>

Reply via email to