Got it!
Thx,junbao

| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao<wind.fly....@outlook.com> 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 
<https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) 
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], 
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source: 
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


回复