Hi,all
使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP<String,String>,
res_body MAP<STRING,STRING>,
extra_info MAP<STRING,STRING>,
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '******:9092',
'connector.properties.zookeeper.connect' = '******:2181',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
消费表x.log.yanfa_log程序如下:
Catalog myCatalog = new HiveCatalog("x", "default",
"D:\\conf", "1.1.0");
tEnv.registerCatalog("x", myCatalog);
Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
tEnv.toAppendStream(rs, Row.class).print();
然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka
topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?