回复: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-13 文章 wind.fly....@outlook.com
Hi,
确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。

Best,
Junbao Zhang

发件人: Leonard Xu 
发送时间: 2020年7月13日 11:57
收件人: user-zh 
主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

Hi,

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

祝好,
Leonard Xu

> 在 2020年7月13日,11:46,wind.fly@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session 
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>  x.report.bi_report_fence_common_indicators
> select
>  fence_id,
>  'finishedOrderCnt' as indicator_name,
>  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>  count(1) as indicator_val
> from
>  (
>select
>  dt,
>  fence_id,
>  fence_coordinates_array,
>  c.driver_location
>from
>  (
>select
>  *
>from
>  (
>select
>  dt,
>  driver_location,
>  r1.f1.fence_info as fence_info
>from
>  (
>select
>  o.dt,
>  o.driver_location,
>  MD5(r.city_code) as k,
>  PROCTIME() as proctime
>from
>  (
>select
>  order_no,
>  dt,
>  driver_location,
>  PROCTIME() as proctime
>from
>  x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>where
>  _type = 'insert'
>  and event_code = 'arriveAndSettlement'
>  ) o
>  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS 
> OF o.proctime AS r ON r.order_no = o.order_no
>  ) o1
>  LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS 
> r1 ON r1.k = o1.k
>  ) a
>where
>  fence_info is not null
>  ) c
>  LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, 
> fence_coordinates_array) ON TRUE
>  ) as b
> where
>  in_fence(fence_coordinates_array, driver_location)
> group by
>  TUMBLE(dt, INTERVAL '5' MINUTE),
>  fence_id;
>   其中 
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>   CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>  _type STRING,
>  _old_id BIGINT,
>  id BIGINT,
>  _old_order_no STRING,
>  order_no STRING,
>  _old_event_code STRING,
>  event_code STRING,
>  _old_from_state TINYINT,
>  from_state TINYINT,
>  _old_to_state TINYINT,
>  to_state TINYINT,
>  _old_operator_type TINYINT,
>  operator_type TINYINT,
>  _old_passenger_location STRING,
>  passenger_location STRING,
>  _old_driver_location STRING,
>  driver_location STRING,
>  _old_trans_time STRING,
>  trans_time STRING,
>  _old_create_time STRING,
>  create_time STRING,
>  _old_update_time STRING,
>  update_time STRING,
>  _old_passenger_poi_address STRING,
>  passenger_poi_address STRING,
>  _old_passenger_detail_address STRING,
>  passenger_detail_address STRING,
>  _old_driver_poi_address STRING,
>  driver_poi_address STRING,
>  _old_driver_detail_address STRING,
>  driver_detail_address STRING,
>  _old_operator STRING,
>  operator STRING,
>  _old_partition_index TINYINT,
>  partition_index TINYINT,
>  dt as TO_TIMESTAMP(trans_time),
>  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.properties.bootstrap.servers' = '*',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.version' = 'universal',
>  'format.type' = 'json',
>  'connector.properties.group.id' = 'testGroup',
>  'connector.startup-mode' = 'group-offsets',
>  'connector.topic' = 'x'
> )



Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 Leonard Xu
Hi,

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

祝好,
Leonard Xu

> 在 2020年7月13日,11:46,wind.fly@outlook.com 写道:
> 
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session 
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>  x.report.bi_report_fence_common_indicators
> select
>  fence_id,
>  'finishedOrderCnt' as indicator_name,
>  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>  count(1) as indicator_val
> from
>  (
>select
>  dt,
>  fence_id,
>  fence_coordinates_array,
>  c.driver_location
>from
>  (
>select
>  *
>from
>  (
>select
>  dt,
>  driver_location,
>  r1.f1.fence_info as fence_info
>from
>  (
>select
>  o.dt,
>  o.driver_location,
>  MD5(r.city_code) as k,
>  PROCTIME() as proctime
>from
>  (
>select
>  order_no,
>  dt,
>  driver_location,
>  PROCTIME() as proctime
>from
>  x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>where
>  _type = 'insert'
>  and event_code = 'arriveAndSettlement'
>  ) o
>  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS 
> OF o.proctime AS r ON r.order_no = o.order_no
>  ) o1
>  LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS 
> r1 ON r1.k = o1.k
>  ) a
>where
>  fence_info is not null
>  ) c
>  LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, 
> fence_coordinates_array) ON TRUE
>  ) as b
> where
>  in_fence(fence_coordinates_array, driver_location)
> group by
>  TUMBLE(dt, INTERVAL '5' MINUTE),
>  fence_id;
>   其中 
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>   CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>  _type STRING,
>  _old_id BIGINT,
>  id BIGINT,
>  _old_order_no STRING,
>  order_no STRING,
>  _old_event_code STRING,
>  event_code STRING,
>  _old_from_state TINYINT,
>  from_state TINYINT,
>  _old_to_state TINYINT,
>  to_state TINYINT,
>  _old_operator_type TINYINT,
>  operator_type TINYINT,
>  _old_passenger_location STRING,
>  passenger_location STRING,
>  _old_driver_location STRING,
>  driver_location STRING,
>  _old_trans_time STRING,
>  trans_time STRING,
>  _old_create_time STRING,
>  create_time STRING,
>  _old_update_time STRING,
>  update_time STRING,
>  _old_passenger_poi_address STRING,
>  passenger_poi_address STRING,
>  _old_passenger_detail_address STRING,
>  passenger_detail_address STRING,
>  _old_driver_poi_address STRING,
>  driver_poi_address STRING,
>  _old_driver_detail_address STRING,
>  driver_detail_address STRING,
>  _old_operator STRING,
>  operator STRING,
>  _old_partition_index TINYINT,
>  partition_index TINYINT,
>  dt as TO_TIMESTAMP(trans_time),
>  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.properties.bootstrap.servers' = '*',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.version' = 'universal',
>  'format.type' = 'json',
>  'connector.properties.group.id' = 'testGroup',
>  'connector.startup-mode' = 'group-offsets',
>  'connector.topic' = 'x'
> )



Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 zilong xiao
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?

wind.fly@outlook.com  于2020年7月13日周一 上午11:46写道:

> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>   x.report.bi_report_fence_common_indicators
> select
>   fence_id,
>   'finishedOrderCnt' as indicator_name,
>   TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>   count(1) as indicator_val
> from
>   (
> select
>   dt,
>   fence_id,
>   fence_coordinates_array,
>   c.driver_location
> from
>   (
> select
>   *
> from
>   (
> select
>   dt,
>   driver_location,
>   r1.f1.fence_info as fence_info
> from
>   (
> select
>   o.dt,
>   o.driver_location,
>   MD5(r.city_code) as k,
>   PROCTIME() as proctime
> from
>   (
> select
>   order_no,
>   dt,
>   driver_location,
>   PROCTIME() as proctime
> from
>   x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
>   _type = 'insert'
>   and event_code = 'arriveAndSettlement'
>   ) o
>   LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME
> AS OF o.proctime AS r ON r.order_no = o.order_no
>   ) o1
>   LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime
> AS r1 ON r1.k = o1.k
>   ) a
> where
>   fence_info is not null
>   ) c
>   LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id,
> fence_coordinates_array) ON TRUE
>   ) as b
> where
>   in_fence(fence_coordinates_array, driver_location)
> group by
>   TUMBLE(dt, INTERVAL '5' MINUTE),
>   fence_id;
>其中
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>CREATE TABLE
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>   _type STRING,
>   _old_id BIGINT,
>   id BIGINT,
>   _old_order_no STRING,
>   order_no STRING,
>   _old_event_code STRING,
>   event_code STRING,
>   _old_from_state TINYINT,
>   from_state TINYINT,
>   _old_to_state TINYINT,
>   to_state TINYINT,
>   _old_operator_type TINYINT,
>   operator_type TINYINT,
>   _old_passenger_location STRING,
>   passenger_location STRING,
>   _old_driver_location STRING,
>   driver_location STRING,
>   _old_trans_time STRING,
>   trans_time STRING,
>   _old_create_time STRING,
>   create_time STRING,
>   _old_update_time STRING,
>   update_time STRING,
>   _old_passenger_poi_address STRING,
>   passenger_poi_address STRING,
>   _old_passenger_detail_address STRING,
>   passenger_detail_address STRING,
>   _old_driver_poi_address STRING,
>   driver_poi_address STRING,
>   _old_driver_detail_address STRING,
>   driver_detail_address STRING,
>   _old_operator STRING,
>   operator STRING,
>   _old_partition_index TINYINT,
>   partition_index TINYINT,
>   dt as TO_TIMESTAMP(trans_time),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '*',
>   'connector.properties.zookeeper.connect' = '*',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'x'
> )
>


flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 wind.fly....@outlook.com
Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 
当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session 
web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
  x.report.bi_report_fence_common_indicators
select
  fence_id,
  'finishedOrderCnt' as indicator_name,
  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
  count(1) as indicator_val
from
  (
select
  dt,
  fence_id,
  fence_coordinates_array,
  c.driver_location
from
  (
select
  *
from
  (
select
  dt,
  driver_location,
  r1.f1.fence_info as fence_info
from
  (
select
  o.dt,
  o.driver_location,
  MD5(r.city_code) as k,
  PROCTIME() as proctime
from
  (
select
  order_no,
  dt,
  driver_location,
  PROCTIME() as proctime
from
  x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
where
  _type = 'insert'
  and event_code = 'arriveAndSettlement'
  ) o
  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS 
OF o.proctime AS r ON r.order_no = o.order_no
  ) o1
  LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS 
r1 ON r1.k = o1.k
  ) a
where
  fence_info is not null
  ) c
  LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, 
fence_coordinates_array) ON TRUE
  ) as b
where
  in_fence(fence_coordinates_array, driver_location)
group by
  TUMBLE(dt, INTERVAL '5' MINUTE),
  fence_id;
   其中 
x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
   CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
  _type STRING,
  _old_id BIGINT,
  id BIGINT,
  _old_order_no STRING,
  order_no STRING,
  _old_event_code STRING,
  event_code STRING,
  _old_from_state TINYINT,
  from_state TINYINT,
  _old_to_state TINYINT,
  to_state TINYINT,
  _old_operator_type TINYINT,
  operator_type TINYINT,
  _old_passenger_location STRING,
  passenger_location STRING,
  _old_driver_location STRING,
  driver_location STRING,
  _old_trans_time STRING,
  trans_time STRING,
  _old_create_time STRING,
  create_time STRING,
  _old_update_time STRING,
  update_time STRING,
  _old_passenger_poi_address STRING,
  passenger_poi_address STRING,
  _old_passenger_detail_address STRING,
  passenger_detail_address STRING,
  _old_driver_poi_address STRING,
  driver_poi_address STRING,
  _old_driver_detail_address STRING,
  driver_detail_address STRING,
  _old_operator STRING,
  operator STRING,
  _old_partition_index TINYINT,
  partition_index TINYINT,
  dt as TO_TIMESTAMP(trans_time),
  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.properties.bootstrap.servers' = '*',
  'connector.properties.zookeeper.connect' = '*',
  'connector.version' = 'universal',
  'format.type' = 'json',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'group-offsets',
  'connector.topic' = 'x'
)