If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact
description of the problem..

Just to conclude this thread there are 2 strange things I found:

1) Is LONG really not supported yet? If I use as output table LONG,STRING I
get
      Exception in thread "main" java.lang.UnsupportedOperationException:
class org.apache.calcite.sql.SqlIdentifier: LONG
      at org.apache.calcite.util.Util.needToImplement(Util.java:967)

2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?

Best,
Flavio


On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imj...@gmail.com> wrote:

> I think this might be a bug in `tableEnv.fromValues`.
>
> Could you try to remove the DataType parameter, and let the framework
> derive the types?
>
> final Table inputTable = tableEnv.fromValues(
>         Row.of(1L, "Hello"), //
>         Row.of(2L, "Hello"), //
>         Row.of(3L, ""), //
>         Row.of(4L, "Ciao"));
>
> Best,
> Jark
>
>
> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com> wrote:
>
>> Hi, Flavio
>>
>> I reproduced your issue, and I think it should be a bug. But I’m not sure
>> it comes from Calcite or Flink shaded Calcite, Flink Table Planner module
>> shaded calcite.
>>
>> Maybe Danny can help explain more.
>>
>> CC: Danny
>>
>> Best
>> Leonard Xu
>>
>> 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it> 写道:
>>
>> If I use
>>
>> final Table inputTable = tableEnv.fromValues(
>>         DataTypes.ROW(
>>             DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>>             DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>         ), ..
>>   tableEnv.executeSql(//
>>         "CREATE TABLE `out` (" +
>>             "col1 STRING," +
>>             "col2 STRING" +
>>             ") WITH (...)
>>
>> the job works as expected but this is wrong IMHO
>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>> should be "STRING NOT NULL" . Am I correct?
>>
>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Sorry, obviously  " 'format' = 'parquet'" + is without comment :D
>>>
>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> I'm trying to test write to parquet using the following code but I have
>>>> an error:
>>>>
>>>>  final TableEnvironment tableEnv =
>>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>>>     final Table inputTable = tableEnv.fromValues(//
>>>>         DataTypes.ROW(//
>>>>             DataTypes.FIELD("col1", DataTypes.STRING()), //
>>>>             DataTypes.FIELD("col2", DataTypes.STRING())//
>>>>         ), //
>>>>         Row.of(1L, "Hello"), //
>>>>         Row.of(2L, "Hello"), //
>>>>         Row.of(3L, ""), //
>>>>         Row.of(4L, "Ciao"));
>>>>     tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>>>     tableEnv.executeSql(//
>>>>         "CREATE TABLE `out` (\n" + //
>>>>             "col1 STRING,\n" + //
>>>>             "col2 STRING\n" + //
>>>>             ") WITH (\n" + //
>>>>             " 'connector' = 'filesystem',\n" + //
>>>>             // " 'format' = 'parquet',\n" + //
>>>>             " 'update-mode' = 'append',\n" + //
>>>>             " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>>>             " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>>>             ")");
>>>>
>>>>     tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>> ParquetDataset");
>>>>
>>>> ---------------------------------
>>>>
>>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>>> relational algebra failed to preserve datatypes:
>>>> validated type:
>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>>> converted type:
>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>>> rel:
>>>> LogicalProject(col1=[$0], col2=[$1])
>>>>   LogicalUnion(all=[true])
>>>>     LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>     LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>     LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>     LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>
>>>>
>>>> What is wrong with my code?
>>>>
>>>
>>>

Reply via email to