参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
flink版本1.10
代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//以后版本会将old planner移除
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" ts TIMESTAMP(3),\n" +
" proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
" WATERMARK FOR ts as ts - INTERVAL '5' SECOND --
在ts上定义watermark,ts成为事件时间列\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 使用 kafka connector\n" +
" 'connector.version' = 'universal', -- kafka 版本,universal 支持
0.11 以上的版本\n" +
" 'connector.topic' = 'user_behavior', -- kafka topic\n" +
" 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset
开始读取\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181',
-- zookeeper 地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',
-- kafka broker 地址\n" +
" 'format.type' = 'json' -- 数据源格式为 json\n" +
")");
Table table1 = tableEnv.sqlQuery("select
user_id,item_id,category_id,behavior,ts," +
"proctime from user_behavior where behavior='buy'");
tableEnv.toAppendStream(table1, Behavior.class).print();
env.execute();
}
public class Behavior {
public Long user_id;
public Long item_id;
public Long category_id;
public String behavior;
public Timestamp ts;
public Timestamp proctime;
@Override
public String toString() {
return "Behavior{" +
"user_id=" + user_id +
", item_id=" + item_id +
", category_id=" + category_id +
", behavior='" + behavior + '\'' +
", ts=" + ts +
", proctime=" + proctime +
'}';
}
}
报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field types of query result and registered TableSink do not match.
Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, behavior:
STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT NULL *PROCTIME*]
Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, proctime:
TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
pojo的类型定义是和source table字段类型是一致的,
为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?