I think this might be a bug in `tableEnv.fromValues`. Could you try to remove the DataType parameter, and let the framework derive the types?
final Table inputTable = tableEnv.fromValues( Row.of(1L, "Hello"), // Row.of(2L, "Hello"), // Row.of(3L, ""), // Row.of(4L, "Ciao")); Best, Jark On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Flavio > > I reproduced your issue, and I think it should be a bug. But I’m not sure > it comes from Calcite or Flink shaded Calcite, Flink Table Planner module > shaded calcite. > > Maybe Danny can help explain more. > > CC: Danny > > Best > Leonard Xu > > 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it> 写道: > > If I use > > final Table inputTable = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("col1", DataTypes.STRING().notNull()), > DataTypes.FIELD("col2", DataTypes.STRING().notNull()) > ), .. > tableEnv.executeSql(// > "CREATE TABLE `out` (" + > "col1 STRING," + > "col2 STRING" + > ") WITH (...) > > the job works as expected but this is wrong IMHO > because DataTypes.STRING() = DataTypes.STRING().nullable() by default. > If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should > be "STRING NOT NULL" . Am I correct? > > On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Sorry, obviously " 'format' = 'parquet'" + is without comment :D >> >> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Hi to all, >>> I'm trying to test write to parquet using the following code but I have >>> an error: >>> >>> final TableEnvironment tableEnv = >>> DatalinksExecutionEnvironment.getBatchTableEnv(); >>> final Table inputTable = tableEnv.fromValues(// >>> DataTypes.ROW(// >>> DataTypes.FIELD("col1", DataTypes.STRING()), // >>> DataTypes.FIELD("col2", DataTypes.STRING())// >>> ), // >>> Row.of(1L, "Hello"), // >>> Row.of(2L, "Hello"), // >>> Row.of(3L, ""), // >>> Row.of(4L, "Ciao")); >>> tableEnv.createTemporaryView("ParquetDataset", inputTable); >>> tableEnv.executeSql(// >>> "CREATE TABLE `out` (\n" + // >>> "col1 STRING,\n" + // >>> "col2 STRING\n" + // >>> ") WITH (\n" + // >>> " 'connector' = 'filesystem',\n" + // >>> // " 'format' = 'parquet',\n" + // >>> " 'update-mode' = 'append',\n" + // >>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + // >>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + // >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >>> ParquetDataset"); >>> >>> --------------------------------- >>> >>> Exception in thread "main" java.lang.AssertionError: Conversion to >>> relational algebra failed to preserve datatypes: >>> validated type: >>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, >>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL >>> converted type: >>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, >>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL >>> rel: >>> LogicalProject(col1=[$0], col2=[$1]) >>> LogicalUnion(all=[true]) >>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"]) >>> LogicalValues(tuples=[[{ 0 }]]) >>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"]) >>> LogicalValues(tuples=[[{ 0 }]]) >>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"]) >>> LogicalValues(tuples=[[{ 0 }]]) >>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET >>> "UTF-16LE"]) >>> LogicalValues(tuples=[[{ 0 }]]) >>> >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465) >>> at >>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/> >>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>> >>> >>> What is wrong with my code? >>> >> >> >