参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
flink版本1.10
代码如下:
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    //以后版本会将old planner移除
    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
    tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
            "    user_id BIGINT,\n" +
            "    item_id BIGINT,\n" +
            "    category_id BIGINT,\n" +
            "    behavior STRING,\n" +
            "    ts TIMESTAMP(3),\n" +
            "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
            "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
            ") WITH (\n" +
            "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
            "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 
0.11 以上的版本\n" +
            "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
            "    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 
开始读取\n" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  
-- zookeeper 地址\n" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  
-- kafka broker 地址\n" +
            "    'format.type' = 'json'  -- 数据源格式为 json\n" +
            ")");
    Table table1 = tableEnv.sqlQuery("select 
user_id,item_id,category_id,behavior,ts," +
            "proctime from user_behavior where behavior='buy'");
    tableEnv.toAppendStream(table1, Behavior.class).print();
    env.execute();

}

public class Behavior {
    public Long user_id;
    public Long item_id;
    public Long category_id;
    public String behavior;
    public Timestamp ts;
    public Timestamp proctime;


    @Override
    public String toString() {
        return "Behavior{" +
                "user_id=" + user_id +
                ", item_id=" + item_id +
                ", category_id=" + category_id +
                ", behavior='" + behavior + '\'' +
                ", ts=" + ts +
                ", proctime=" + proctime +
                '}';
    }
}
报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink  do not match.
Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, behavior: 
STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT NULL *PROCTIME*]
Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, proctime: 
TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
        at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
        at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
        at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)

pojo的类型定义是和source table字段类型是一致的,
为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?

回复