hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve
datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME)
ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME)
ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3],
ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3],
ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
flink版本:1.10.1
blink planner,streaming model
Thx
| |
Sun.Zhu
|
|
[email protected]
|
签名由网易邮箱大师定制