Hi, 确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。
Best, Junbao Zhang ________________________________ 发件人: Leonard Xu <xbjt...@gmail.com> 发送时间: 2020年7月13日 11:57 收件人: user-zh <user-zh@flink.apache.org> 主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark Hi, 可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。 祝好, Leonard Xu > 在 2020年7月13日,11:46,wind.fly....@outlook.com 写道: > > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, > 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session > web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.report.bi_report_fence_common_indicators > select > fence_id, > 'finishedOrderCnt' as indicator_name, > TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts, > count(1) as indicator_val > from > ( > select > dt, > fence_id, > fence_coordinates_array, > c.driver_location > from > ( > select > * > from > ( > select > dt, > driver_location, > r1.f1.fence_info as fence_info > from > ( > select > o.dt, > o.driver_location, > MD5(r.city_code) as k, > PROCTIME() as proctime > from > ( > select > order_no, > dt, > driver_location, > PROCTIME() as proctime > from > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner > where > _type = 'insert' > and event_code = 'arriveAndSettlement' > ) o > LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS > OF o.proctime AS r ON r.order_no = o.order_no > ) o1 > LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS > r1 ON r1.k = o1.k > ) a > where > fence_info is not null > ) c > LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, > fence_coordinates_array) ON TRUE > ) as b > where > in_fence(fence_coordinates_array, driver_location) > group by > TUMBLE(dt, INTERVAL '5' MINUTE), > fence_id; > 其中 > x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下: > CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner( > _type STRING, > _old_id BIGINT, > id BIGINT, > _old_order_no STRING, > order_no STRING, > _old_event_code STRING, > event_code STRING, > _old_from_state TINYINT, > from_state TINYINT, > _old_to_state TINYINT, > to_state TINYINT, > _old_operator_type TINYINT, > operator_type TINYINT, > _old_passenger_location STRING, > passenger_location STRING, > _old_driver_location STRING, > driver_location STRING, > _old_trans_time STRING, > trans_time STRING, > _old_create_time STRING, > create_time STRING, > _old_update_time STRING, > update_time STRING, > _old_passenger_poi_address STRING, > passenger_poi_address STRING, > _old_passenger_detail_address STRING, > passenger_detail_address STRING, > _old_driver_poi_address STRING, > driver_poi_address STRING, > _old_driver_detail_address STRING, > driver_detail_address STRING, > _old_operator STRING, > operator STRING, > _old_partition_index TINYINT, > partition_index TINYINT, > dt as TO_TIMESTAMP(trans_time), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '*', > 'connector.properties.zookeeper.connect' = '*', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'xxxxx' > )