hi??
1. ????????flink 1.13.1
??????row(a,b)????????????????????????????????????????bug??
2. ????
row????????row????????????????????row????????name??????????????name????????
??????????????????
package test;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;
public class DataGenTest {
public static void main(String[] args) {
StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment);
tableEnvironment.executeSql("CREATE TABLE datagen
(\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str
STRING,\n" +
" ts AS
localtimestamp,\n" +
" WATERMARK FOR ts AS
ts\n" +
") WITH (\n" +
" 'connector' =
'datagen',\n" +
"
'rows-per-second'='5',\n" +
"
'fields.f_sequence.kind'='sequence',\n" +
"
'fields.f_sequence.start'='1',\n" +
"
'fields.f_sequence.end'='1000',\n" +
"
'fields.f_random.min'='1',\n" +
"
'fields.f_random.max'='1000',\n" +
"
'fields.f_random_str.length'='10'\n" +
")");
Table table = tableEnvironment.sqlQuery("select
row(f_sequence, f_random) as c from datagen");
ResolvedSchema resolvedSchema =
table.getResolvedSchema();
System.out.println(resolvedSchema);
/**
* ??????????
* (
* `c` ROW<`EXPR$0` INT, `EXPR$1`
INT> NOT NULL
* )
*
????????????????row????????????????????row????????????????????row??????????????????????????c1,
c2??
* (
* `c` ROW<`c1` INT, `c2` INT>
NOT NULL
* )
*/
Table table1 = tableEnvironment.sqlQuery("select *
from " + table);
/**
* ????????????sql??????
* Exception in thread "main"
java.lang.AssertionError: Conversion to relational algebra failed to preserve
datatypes:
* validated type:
*
RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL
c) NOT NULL
* converted type:
* RecordType(RecordType(INTEGER EXPR$0,
INTEGER EXPR$1) NOT NULL c) NOT NULL
* rel:
* LogicalProject(c=[ROW($0, $1)])
*
LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
*
LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2],
ts=[LOCALTIMESTAMP])
*
LogicalTableScan(table=[[default_catalog, default_database, datagen]])
*/
ResolvedSchema resolvedSchema1 =
table1.getResolvedSchema();
System.out.println(resolvedSchema1);
table.execute().print();
}
}