好的,我试试,感谢



| |
Sun.Zhu
|
|
邮箱:[email protected]
|

Signature is customized by Netease Mail Master

在2020年05月04日 11:22,Jark Wu 写道:
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <[email protected]> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     //以后版本会将old planner移除
>     EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>     tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
>             "    user_id BIGINT,\n" +
>             "    item_id BIGINT,\n" +
>             "    category_id BIGINT,\n" +
>             "    behavior STRING,\n" +
>             "    ts TIMESTAMP(3),\n" +
>             "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
>             "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
>             ") WITH (\n" +
>             "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
>             "    'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
>             "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
>             "    'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
>             "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
>             "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
>             "    'format.type' = 'json'  -- 数据源格式为 json\n" +
>             ")");
>     Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
>             "proctime from user_behavior where behavior='buy'");
>     tableEnv.toAppendStream(table1, Behavior.class).print();
>     env.execute();
>
> }
>
> public class Behavior {
>     public Long user_id;
>     public Long item_id;
>     public Long category_id;
>     public String behavior;
>     public Timestamp ts;
>     public Timestamp proctime;
>
>
>     @Override
>     public String toString() {
>         return "Behavior{" +
>                 "user_id=" + user_id +
>                 ", item_id=" + item_id +
>                 ", category_id=" + category_id +
>                 ", behavior='" + behavior + '\'' +
>                 ", ts=" + ts +
>                 ", proctime=" + proctime +
>                 '}';
>     }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
>         at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>         at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>         at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?

回复