[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: (was: Screenshot 2023-07-31 at 10.37.11 AM-1.png)

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.40.30 AM.png, Screenshot 
> 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 AM.png, 
> Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 10.41.35 
> AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png, withoutHeader.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: (was: Screenshot 2023-07-31 at 10.37.11 AM-3.png)

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.40.30 AM.png, Screenshot 
> 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 AM.png, 
> Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 10.41.35 
> AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png, withoutHeader.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: (was: Screenshot 2023-07-31 at 10.37.11 AM-2.png)

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-3.png, Screenshot 2023-07-31 at 10.40.30 AM.png, 
> Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 
> AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 
> 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png, withoutHeader.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Description: 
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We anticipated it 
would follow the same pattern of increasing and decreasing based on the 
throughput, as it does without the 'UNION ALL'.
When I analyze the breakdown of the checkpoint size, I find that the 'Interval 
Join' operator has the largest size. I suspect there might be a bug causing 
this difference in checkpoint behavior due to the addition of the 'GROUP BY' in 
the query.
We are using Flink version 1.16.1 with RocksDB as the backend, and incremental 
checkpointing is enabled.
PS: Interestingly, when I take periodic savepoints for both jobs, their 
patterns are similar, and the savepoint size is smaller than the checkpoint in 
the job with the 'UNION ALL'. Attaching the queries in the comment below.

With Union ALL:

select
  *
from
  (
with p2d as (
  select
delivered.job_id,
delivered.store_id,
substring(
  GetGeoHash(
CAST(delivered.metadata_json.address.lat as double),
CAST(delivered.metadata_json.address.lng as double)
  )
  from
1 for 6
) AS cx_gh6,
(
  (
cast(
  delivered.metadata_json.lastMileDistance as double
) / cast(1000 as double)
  ) /(
cast(
  delivered.updated_at - pickedup.updated_at as double
) / cast(60 * 60 * 1000 as double)
  )
) as lm_speed_kmph,
delivered.proctime as proctime
  from
awz_s3_OrderLogsEvent pickedup
inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
and delivered.status = 'DELIVERY_DELIVERED'
and delivered.type = 'INSTA'
and pickedup.proctime between delivered.proctime - interval '95' minute
and delivered.proctime + interval '5' minute
  where
pickedup.status = 'DELIVERY_PICKEDUP'
and pickedup.type = 'INSTA'
)
select
  'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  store_id
union all
select
  'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  cx_gh6
  )

 

Without Union all which is showing expected behaviour:

select
  *
from
  (
with p2d as (
  select
delivered.job_id,
delivered.store_id,
substring(
  GetGeoHash(
CAST(delivered.metadata_json.address.lat as double),
CAST(delivered.metadata_json.address.lng as double)
  )
  from
1 for 6
) AS cx_gh6,
(
  (
cast(
  delivered.metadata_json.lastMileDistance as double
) / cast(1000 as double)
  ) /(
cast(
  delivered.updated_at - pickedup.updated_at as double
) / cast(60 * 60 * 1000 as double)
  )
) as lm_speed_kmph,
delivered.proctime as proctime
  from
awz_s3_OrderLogsEvent pickedup
inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
and delivered.status = 'DELIVERY_DELIVERED'
and delivered.type = 'INSTA'
and pickedup.proctime between delivered.proctime - interval '95' minute
and delivered.proctime + interval '5' minute
  where
pickedup.status = 'DELIVERY_PICKEDUP'
and pickedup.type = 'INSTA'
)
select
  'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  cx_gh6
  )

 

 

!withoutHeader.png!

  was:
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: (was: Screenshot 2023-07-31 at 10.37.11 AM.png)

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-3.png, Screenshot 2023-07-31 at 10.40.30 AM.png, 
> Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 
> AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 
> 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png, withoutHeader.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: withoutHeader.png

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-3.png, Screenshot 2023-07-31 at 10.40.30 AM.png, 
> Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 
> AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 
> 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png, withoutHeader.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: Screenshot 2023-07-31 at 10.37.11 AM-3.png

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-2.png, Screenshot 2023-07-31 at 10.37.11 AM-3.png, 
> Screenshot 2023-07-31 at 10.37.11 AM.png, Screenshot 2023-07-31 at 10.40.30 
> AM.png, Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 
> 10.41.07 AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 
> 2023-07-31 at 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: Screenshot 2023-07-31 at 10.37.11 AM-2.png

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-2.png, Screenshot 2023-07-31 at 10.37.11 AM.png, 
> Screenshot 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 
> AM.png, Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 
> 10.41.26 AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 
> 2023-07-31 at 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
>   

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Description: 
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We anticipated it 
would follow the same pattern of increasing and decreasing based on the 
throughput, as it does without the 'UNION ALL'.
When I analyze the breakdown of the checkpoint size, I find that the 'Interval 
Join' operator has the largest size. I suspect there might be a bug causing 
this difference in checkpoint behavior due to the addition of the 'GROUP BY' in 
the query.
We are using Flink version 1.16.1 with RocksDB as the backend, and incremental 
checkpointing is enabled.
PS: Interestingly, when I take periodic savepoints for both jobs, their 
patterns are similar, and the savepoint size is smaller than the checkpoint in 
the job with the 'UNION ALL'. Attaching the queries in the comment below.

With Union ALL:

select
  *
from
  (
with p2d as (
  select
delivered.job_id,
delivered.store_id,
substring(
  GetGeoHash(
CAST(delivered.metadata_json.address.lat as double),
CAST(delivered.metadata_json.address.lng as double)
  )
  from
1 for 6
) AS cx_gh6,
(
  (
cast(
  delivered.metadata_json.lastMileDistance as double
) / cast(1000 as double)
  ) /(
cast(
  delivered.updated_at - pickedup.updated_at as double
) / cast(60 * 60 * 1000 as double)
  )
) as lm_speed_kmph,
delivered.proctime as proctime
  from
awz_s3_OrderLogsEvent pickedup
inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
and delivered.status = 'DELIVERY_DELIVERED'
and delivered.type = 'INSTA'
and pickedup.proctime between delivered.proctime - interval '95' minute
and delivered.proctime + interval '5' minute
  where
pickedup.status = 'DELIVERY_PICKEDUP'
and pickedup.type = 'INSTA'
)
select
  'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  store_id
union all
select
  'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  cx_gh6
  )

 

Without Union all which is showing expected behaviour:

select
  *
from
  (
with p2d as (
  select
delivered.job_id,
delivered.store_id,
substring(
  GetGeoHash(
CAST(delivered.metadata_json.address.lat as double),
CAST(delivered.metadata_json.address.lng as double)
  )
  from
1 for 6
) AS cx_gh6,
(
  (
cast(
  delivered.metadata_json.lastMileDistance as double
) / cast(1000 as double)
  ) /(
cast(
  delivered.updated_at - pickedup.updated_at as double
) / cast(60 * 60 * 1000 as double)
  )
) as lm_speed_kmph,
delivered.proctime as proctime
  from
awz_s3_OrderLogsEvent pickedup
inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
and delivered.status = 'DELIVERY_DELIVERED'
and delivered.type = 'INSTA'
and pickedup.proctime between delivered.proctime - interval '95' minute
and delivered.proctime + interval '5' minute
  where
pickedup.status = 'DELIVERY_PICKEDUP'
and pickedup.type = 'INSTA'
)
select
  'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
  round(Avg(lm_speed_kmph), 4) AS `value`
from
  p2d
group by
  HOP(
proctime,
interval '5' minute,
interval '30' minute
  ),
  cx_gh6
  )

 

 

!Screenshot 2023-07-31 at 10.37.11 AM.png!

  was:
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is 

[jira] [Updated] (FLINK-32718) 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint to become unbounded

2023-09-23 Thread Neha Aggarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
--
Attachment: Screenshot 2023-07-31 at 10.37.11 AM-1.png

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> ---
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
>Reporter: Neha Aggarwal
>Assignee: Jayme Howard
>Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM.png, Screenshot 2023-07-31 at 10.40.30 AM.png, 
> Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 
> AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 
> 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
>   )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
>   from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95' 
> minute
> and delivered.proctime + interval '5' minute
>   where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
>   'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   store_id
> union all
> select
>   'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>   round(Avg(lm_speed_kmph), 4) AS `value`
> from
>   p2d
> group by
>   HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
>   ),
>   cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
> with p2d as (
>   select
> delivered.job_id,
> delivered.store_id,
> substring(
>   GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
>   )
>   from
> 1 for 6
> ) AS cx_gh6,
> (
>   (
> cast(
>   delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
>   ) /(
> cast(
>   delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> 

[jira] [Updated] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure

2023-09-23 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-26974:
---
Labels: auto-deprioritized-major stale-assigned test-stability  (was: 
auto-deprioritized-major test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
> -
>
> Key: FLINK-26974
> URL: https://issues.apache.org/jira/browse/FLINK-26974
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yun Gao
>Assignee: Huang Xingbo
>Priority: Critical
>  Labels: auto-deprioritized-major, stale-assigned, test-stability
>
> {code:java}
> Mar 31 10:49:17 === FAILURES 
> ===
> Mar 31 10:49:17 __ 
> EmbeddedThreadDependencyTests.test_add_python_file __
> Mar 31 10:49:17 
> Mar 31 10:49:17 self = 
>  testMethod=test_add_python_file>
> Mar 31 10:49:17 
> Mar 31 10:49:17 def test_add_python_file(self):
> Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, 
> "python_file_dir_" + str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir)
> Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nraise 
> Exception('This function should not be called!')")
> Mar 31 10:49:17 self.t_env.add_python_file(python_file_path)
> Mar 31 10:49:17 
> Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join(
> Mar 31 10:49:17 self.tempdir, "python_file_dir_" + 
> str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority)
> Mar 31 10:49:17 python_file_path_higher_priority = 
> os.path.join(python_file_dir_with_higher_priority,
> Mar 31 10:49:17 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2")
> Mar 31 10:49:17 
> self.t_env.add_python_file(python_file_path_higher_priority)
> Mar 31 10:49:17 
> Mar 31 10:49:17 def plus_two(i):
> Mar 31 10:49:17 from test_dependency_manage_lib import add_two
> Mar 31 10:49:17 return add_two(i)
> Mar 31 10:49:17 
> Mar 31 10:49:17 self.t_env.create_temporary_system_function(
> Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), 
> DataTypes.BIGINT()))
> Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink(
> Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), 
> DataTypes.BIGINT()])
> Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink)
> Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, 
> 1)], ['a', 'b'])
> Mar 31 10:49:17 >   t.select(expr.call("add_two", t.a), 
> t.a).execute_insert("Results").wait()
> Mar 31 10:49:17 
> Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: 
> Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ 
> Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait
> Mar 31 10:49:17 get_method(self._j_table_result, "await")()
> Mar 31 10:49:17 
> .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in 
> __call__
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=27239



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32605) JoinITCase.testFullOuterJoinWithMultipleKeys fails with TimeoutException: Futures timed out after [20 seconds]

2023-09-23 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32605:
---
  Labels: auto-deprioritized-major test-stability  (was: stale-major 
test-stability)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> JoinITCase.testFullOuterJoinWithMultipleKeys fails with TimeoutException: 
> Futures timed out after [20 seconds]
> --
>
> Key: FLINK-32605
> URL: https://issues.apache.org/jira/browse/FLINK-32605
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / RPC, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> While execution of 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51254=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=11681
> there was an exception 
> {noformat}
> Jul 14 04:35:32 Caused by: java.lang.Exception: Could not create actor system
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.startLocalActorSystem(AkkaBootstrapTools.java:238)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:349)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:327)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:247)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.minicluster.MiniCluster.createLocalRpcService(MiniCluster.java:1188)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:355)
> Jul 14 04:35:32   at 
> org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:77)
> Jul 14 04:35:32   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
> Jul 14 04:35:32   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2218)
> Jul 14 04:35:32   at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
> Jul 14 04:35:32   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:992)
> Jul 14 04:35:32   ... 102 more
> Jul 14 04:35:32 Caused by: java.util.concurrent.TimeoutException: Futures 
> timed out after [20 seconds]
> Jul 14 04:35:32   at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
> Jul 14 04:35:32   at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
> Jul 14 04:35:32   at 
> scala.concurrent.Await$.$anonfun$result$1(package.scala:223)
> Jul 14 04:35:32   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
> Jul 14 04:35:32   at scala.concurrent.Await$.result(package.scala:146)
> Jul 14 04:35:32   at 
> akka.stream.SystemMaterializer.(SystemMaterializer.scala:90)
> Jul 14 04:35:32   at 
> akka.stream.SystemMaterializer$.createExtension(SystemMaterializer.scala:39)
> Jul 14 04:35:32   at 
> akka.stream.SystemMaterializer$.createExtension(SystemMaterializer.scala:32)
> Jul 14 04:35:32   at 
> akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1165)
> Jul 14 04:35:32   at 
> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:1208)
> Jul 14 04:35:32   at scala.collection.Iterator.foreach(Iterator.scala:943)
> Jul 14 04:35:32   at 
> scala.collection.Iterator.foreach$(Iterator.scala:943)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.RobustActorSystem.create(RobustActorSystem.java:54)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaUtils.createActorSystem(AkkaUtils.java:421)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.startActorSystem(AkkaBootstrapTools.java:253)
> Jul 14 04:35:32   at 
> org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools.startLocalActorSystem(AkkaBootstrapTools.java:236)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28879) New File Sink s3 end-to-end test failed with Output hash mismatch

2023-09-23 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28879:
---
Labels: stale-major test-stability  (was: test-stability)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> New File Sink s3 end-to-end test failed with Output hash mismatch
> -
>
> Key: FLINK-28879
> URL: https://issues.apache.org/jira/browse/FLINK-28879
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / FileSystem, Tests
>Affects Versions: 1.16.0, 1.17.2
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: stale-major, test-stability
>
> {code:java}
> 2022-08-09T00:50:02.8229585Z Aug 09 00:50:02 FAIL File Streaming Sink: Output 
> hash mismatch.  Got 6037b01ca0ffc61a95c12cb475c661a8, expected 
> 6727342fdd3aae2129e61fc8f433fb6f.
> 2022-08-09T00:50:02.8230700Z Aug 09 00:50:02 head hexdump of actual:
> 2022-08-09T00:50:02.8477319Z Aug 09 00:50:02 000   E   r   r   o   r  
>  e   x   e   c   u   t   i   n   g
> 2022-08-09T00:50:02.8478206Z Aug 09 00:50:02 010   a   w   s   c   o  
>  m   m   a   n   d   :   s   3
> 2022-08-09T00:50:02.8479475Z Aug 09 00:50:02 020   c   p   -   -   q  
>  u   i   e   t   s   3   :   /   /
> 2022-08-09T00:50:02.8480205Z Aug 09 00:50:02 030   f   l   i   n   k   -  
>  i   n   t   e   g   r   a   t   i   o
> 2022-08-09T00:50:02.8480924Z Aug 09 00:50:02 040   n   -   t   e   s   t  
>  s   /   t   e   m   p   /   t   e   s
> 2022-08-09T00:50:02.8481612Z Aug 09 00:50:02 050   t   _   f   i   l   e  
>  _   s   i   n   k   -   1   d   3   d
> 2022-08-09T00:50:02.8483048Z Aug 09 00:50:02 060   4   0   0   8   -   b  
>  0   b   f   -   4   2   6   5   -   b
> 2022-08-09T00:50:02.8483618Z Aug 09 00:50:02 070   e   0   e   -   3   b  
>  9   f   7   8   2   c   5   5   2   d
> 2022-08-09T00:50:02.8484222Z Aug 09 00:50:02 080   /   h   o   s   t  
>  d   i   r   /   /   t   e   m   p   -
> 2022-08-09T00:50:02.8484831Z Aug 09 00:50:02 090   t   e   s   t   -   d  
>  i   r   e   c   t   o   r   y   -   2
> 2022-08-09T00:50:02.8485719Z Aug 09 00:50:02 0a0   3   9   3   7   7   8  
>  2   6   8   0   /   t   e   m   p   /
> 2022-08-09T00:50:02.8486427Z Aug 09 00:50:02 0b0   t   e   s   t   _   f  
>  i   l   e   _   s   i   n   k   -   1
> 2022-08-09T00:50:02.8487134Z Aug 09 00:50:02 0c0   d   3   d   4   0   0  
>  8   -   b   0   b   f   -   4   2   6
> 2022-08-09T00:50:02.8487826Z Aug 09 00:50:02 0d0   5   -   b   e   0   e  
>  -   3   b   9   f   7   8   2   c   5
> 2022-08-09T00:50:02.8488511Z Aug 09 00:50:02 0e0   5   2   d   -   -  
>  e   x   c   l   u   d   e   '   *
> 2022-08-09T00:50:02.8489202Z Aug 09 00:50:02 0f0   '   -   -   i   n  
>  c   l   u   d   e   '   *   /   p
> 2022-08-09T00:50:02.8489891Z Aug 09 00:50:02 100   a   r   t   -   [   !  
>  /   ]   *   '   -   -   r   e   c
> 2022-08-09T00:50:02.8490385Z Aug 09 00:50:02 110   u   r   s   i   v   e  
> \n
> 2022-08-09T00:50:02.8490822Z Aug 09 00:50:02 117
> 2022-08-09T00:50:02.8502212Z Aug 09 00:50:02 Stopping job timeout watchdog 
> (with pid=141134)
> 2022-08-09T00:50:06.8430959Z rm: cannot remove 
> '/home/vsts/work/1/s/flink-dist/target/flink-1.16-SNAPSHOT-bin/flink-1.16-SNAPSHOT/lib/flink-shaded-netty-tcnative-static-*.jar':
>  No such file or directory
> 2022-08-09T00:50:06.9278248Z Aug 09 00:50:06 
> 5ccfeb22307c2a88625a38b9537acc79001d1b29094ef40fd70692ce11407502
> 2022-08-09T00:50:06.9618147Z Aug 09 00:50:06 
> 5ccfeb22307c2a88625a38b9537acc79001d1b29094ef40fd70692ce11407502
> 2022-08-09T00:50:06.9645077Z Aug 09 00:50:06 [FAIL] Test script contains 
> errors.
> 2022-08-09T00:50:06.9666227Z Aug 09 00:50:06 Checking of logs skipped.
> 2022-08-09T00:50:06.9671891Z Aug 09 00:50:06 
> 2022-08-09T00:50:06.9673050Z Aug 09 00:50:06 [FAIL] 'New File Sink s3 
> end-to-end test' failed after 3 minutes and 42 seconds! Test exited with exit 
> code 1
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39667=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=4136



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2023-09-23 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-31860:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Blocker
>  Labels: pull-request-available, stale-assigned
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty for a nonpartitioned topic.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special/different case. 
> Currently, the following error is raised even though the setTopics() method 
> is called on the PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special/different case. Currently, 
the following error is raised even though the setTopics() method is called on 
the PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special/different case. 
> Currently, the following error is raised even though the setTopics() method 
> is called on the PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

The distinctTopics() method of the TopicNameUtils class is what ensures the 
list of partitions is empty.

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special case. Currently, the 
> following error is raised even though the setTopics() method is called on the 
> PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>         at 
> 

[jira] [Updated] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Kania updated FLINK-33135:

Description: 
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket https://issues.apache.org/jira/browse/FLINK-33136.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 

  was:
For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket ???.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 


> Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic
> --
>
> Key: FLINK-33135
> URL: https://issues.apache.org/jira/browse/FLINK-33135
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> For a non-partitioned topic, the Flink Pulsar connector creates a 
> TopicRouter, ensures that the list of partitions is empty during deployment 
> and then complains when the list of partitions supplied to it is empty at 
> runtime. The default TopicRouter that is created is the RoundRobinTopicRouter 
> and it provides a nonsensical error for this type of TopicRouter. This error 
> message issue is raised in ticket 
> https://issues.apache.org/jira/browse/FLINK-33136.
> The connector should not be applying a topic router to nonpartitioned topics 
> or should treat the nonpartitioned topic as a special case. Currently, the 
> following error is raised even though the setTopics() method is called on the 
> PulsarSink.builder() with a single topic.
> Caused by: java.lang.IllegalArgumentException: You should provide topics for 
> routing topic by message key hash.
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>         ... 30 more
>  
>  



--
This 

[jira] [Created] (FLINK-33136) Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message

2023-09-23 Thread Jason Kania (Jira)
Jason Kania created FLINK-33136:
---

 Summary: Flink Pulsar Connector RoundRobinTopicRouter Generates 
Invalid Error Message
 Key: FLINK-33136
 URL: https://issues.apache.org/jira/browse/FLINK-33136
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


The RoundRobinTopicRouter class generates the runtime error "You should provide 
topics for routing topic by message key hash." when no partitions are set. This 
error is a direct copy of the error in the KeyHashTopicRouter but is 
nonsensical to a RoundRobinTopicRouter since hashing is not involved in route 
selection.

More importantly however, this error should be detected at deploy time when the 
PulsarSink is built with the builder since the list of topics is supplied via 
the setTopics() method of the builder.

Additionally, the wording of the error is not clear in any case and could be 
improved to something like: "No partition routing topics were provided to allow 
for topic routing"

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33135) Flink Pulsar Connector Attempts Partitioned Routing on Unpartitioned Topic

2023-09-23 Thread Jason Kania (Jira)
Jason Kania created FLINK-33135:
---

 Summary: Flink Pulsar Connector Attempts Partitioned Routing on 
Unpartitioned Topic
 Key: FLINK-33135
 URL: https://issues.apache.org/jira/browse/FLINK-33135
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


For a non-partitioned topic, the Flink Pulsar connector creates a TopicRouter, 
ensures that the list of partitions is empty during deployment and then 
complains when the list of partitions supplied to it is empty at runtime. The 
default TopicRouter that is created is the RoundRobinTopicRouter and it 
provides a nonsensical error for this type of TopicRouter. This error message 
issue is raised in ticket ???.

The connector should not be applying a topic router to nonpartitioned topics or 
should treat the nonpartitioned topic as a special case. Currently, the 
following error is raised even though the setTopics() method is called on the 
PulsarSink.builder() with a single topic.

Caused by: java.lang.IllegalArgumentException: You should provide topics for 
routing topic by message key hash.
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
        at 
org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:56)
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:147)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 30 more

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31854) FLIP-307: Flink Connector Redshift

2023-09-23 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-31854:
---
Labels:   (was: aws)

> FLIP-307: Flink Connector Redshift 
> ---
>
> Key: FLINK-31854
> URL: https://issues.apache.org/jira/browse/FLINK-31854
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Samrat Deb
>Priority: Major
>
> This is an umbrella Jira for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31854) FLIP-307: Flink Connector Redshift

2023-09-23 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-31854:
---
Labels: aws  (was: )

> FLIP-307: Flink Connector Redshift 
> ---
>
> Key: FLINK-31854
> URL: https://issues.apache.org/jira/browse/FLINK-31854
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Samrat Deb
>Priority: Major
>  Labels: aws
>
> This is an umbrella Jira for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33134) Flink Connector redshift E2E test

2023-09-23 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-33134:
--

 Summary: Flink Connector redshift E2E test 
 Key: FLINK-33134
 URL: https://issues.apache.org/jira/browse/FLINK-33134
 Project: Flink
  Issue Type: Sub-task
Reporter: Samrat Deb






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33133) Flink Document Update

2023-09-23 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-33133:
--

 Summary: Flink Document Update 
 Key: FLINK-33133
 URL: https://issues.apache.org/jira/browse/FLINK-33133
 Project: Flink
  Issue Type: Sub-task
Reporter: Samrat Deb






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33132) Flink Connector Redshift Sink Implementation

2023-09-23 Thread Samrat Deb (Jira)
Samrat Deb created FLINK-33132:
--

 Summary: Flink Connector Redshift Sink Implementation 
 Key: FLINK-33132
 URL: https://issues.apache.org/jira/browse/FLINK-33132
 Project: Flink
  Issue Type: Sub-task
Reporter: Samrat Deb






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31854) FLIP-307: Flink Connector Redshift

2023-09-23 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-31854:
---
Summary: FLIP-307: Flink Connector Redshift   (was: Flink connector 
redshift )

> FLIP-307: Flink Connector Redshift 
> ---
>
> Key: FLINK-31854
> URL: https://issues.apache.org/jira/browse/FLINK-31854
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Samrat Deb
>Priority: Major
>
> This is an umbrella Jira for 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] jayadeep-jayaraman commented on pull request #22281: [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11.

2023-09-23 Thread via GitHub


jayadeep-jayaraman commented on PR #22281:
URL: https://github.com/apache/flink/pull/22281#issuecomment-1732361470

   Hi @cnauroth - There are multiple code paths and API's in Flink which uses 
different google libraries and we should make them consistent.
   
   For instance 
   - Writes that use `saveAsText`, `saveAsCSV` etc go via the 
[hadoop-gcs](https://github.com/GoogleCloudDataproc/hadoop-connectors) 
connector.
   - Checkpoints also go via the 
[hadoop-gcs](https://github.com/GoogleCloudDataproc/hadoop-connectors) 
connector.
   - FileSink API which is used to write the final files to GCS uses the java 
storage library.
   
   The ideal scenario will be that everything goes via the 
[hadoop-gcs](https://github.com/GoogleCloudDataproc/hadoop-connectors) 
connector.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33131) Grafana dashboard for flink k8 operator metrics

2023-09-23 Thread Gaurav Miglani (Jira)
Gaurav Miglani created FLINK-33131:
--

 Summary: Grafana dashboard for flink k8 operator metrics
 Key: FLINK-33131
 URL: https://issues.apache.org/jira/browse/FLINK-33131
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gaurav Miglani


I have created a grafana dashboard for flink k8 operator, can I commit same in 
flink k8 operator repo..



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] cnauroth commented on pull request #22281: [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11.

2023-09-23 Thread via GitHub


cnauroth commented on PR #22281:
URL: https://github.com/apache/flink/pull/22281#issuecomment-1732359065

   @yigress , thank you for reporting this.
   
   The stack trace appears to be in the checkpointing code path, which uses the 
GCS client directly (not the Hadoop `FileSystem`). Looking back at my testing 
notes in the pull request description, I think I tested reading and writing 
through the `FileSystem`, but not this direct client code path. Sorry about 
that.
   
   The patch upgraded from google-cloud-storage 2.2.3 (transitive dependency to 
Guava 31.0.1-jre) to google-cloud-storage 2.15.0 (transitive dependency to 
Guava 31.1-jre). Both the GCS client and the `FileSystem` want Guava 31.1-jre 
at this point. The `FileSystem` was insulated though, because it shades Guava. 
The most likely answer now is for flink-gs-fs-hadoop to shade its Guava too. (I 
hope that's not going to bump into new problems with flink-fs-hadoop-shaded 
though.) I'll aim to test a new patch next week.
   
   Long-term, it would be great to find a way to re-route this checkpointing 
code path through the `FileSystem`, so that we can keep the dependency 
management consistent throughout. I haven't explored what that would take yet.
   
   CC: @MartijnVisser , @jayadeep-jayaraman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33098) Support kubernetes autoscaler using generic interface

2023-09-23 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33098:

Description: 
 # Moving all classes aren't related to kubernetes to flink-autoscaler module
 # Support kubernetes autoscaler using generic interface (The most important 
commit)
 ## Migrate all generic classes from flink-kubernetes-operator-autoscaler 
module to autoscaler moudle
 ## Migrate all kubernetes implementation classes from 
flink-kubernetes-operator-autoscaler module to flink-kubernetes-operator moudle
 ## Support all old tests
 ## Removing the flink-kubernetes-operator-autoscaler module
 # Removing the option prefix(kubernetes.operator.) for all options and update 
the doc(All old option names are marked with withDeprecatedKeys to ensure the 
compatibility.)

  was:
# Moving all classes aren't related to kubernetes to flink-autoscaler module
 # Support kubernetes autoscaler using generic interface (The most important 
commit)

 ## Migrate all generic classes from flink-kubernetes-operator-autoscaler 
module to autoscaler moudle
 ## Migrate all kubernetes implementation classes from 
flink-kubernetes-operator-autoscaler module to flink-kubernetes-operator moudle
 ## Support all old tests
 ## Removing the flink-kubernetes-operator-autoscaler module
 # Removing the option prefix(kubernetes.operator.) for all options and update 
the doc(All old option names are marked with withDeprecatedKeys to ensure the 
compatibility.)


> Support kubernetes autoscaler using generic interface
> -
>
> Key: FLINK-33098
> URL: https://issues.apache.org/jira/browse/FLINK-33098
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
>  # Moving all classes aren't related to kubernetes to flink-autoscaler module
>  # Support kubernetes autoscaler using generic interface (The most important 
> commit)
>  ## Migrate all generic classes from flink-kubernetes-operator-autoscaler 
> module to autoscaler moudle
>  ## Migrate all kubernetes implementation classes from 
> flink-kubernetes-operator-autoscaler module to flink-kubernetes-operator 
> moudle
>  ## Support all old tests
>  ## Removing the flink-kubernetes-operator-autoscaler module
>  # Removing the option prefix(kubernetes.operator.) for all options and 
> update the doc(All old option names are marked with withDeprecatedKeys to 
> ensure the compatibility.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33098) Support kubernetes autoscaler using generic interface

2023-09-23 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33098:

Description: 
# Moving all classes aren't related to kubernetes to flink-autoscaler module
 # Support kubernetes autoscaler using generic interface (The most important 
commit)

 ## Migrate all generic classes from flink-kubernetes-operator-autoscaler 
module to autoscaler moudle
 ## Migrate all kubernetes implementation classes from 
flink-kubernetes-operator-autoscaler module to flink-kubernetes-operator moudle
 ## Support all old tests
 ## Removing the flink-kubernetes-operator-autoscaler module
 # Removing the option prefix(kubernetes.operator.) for all options and update 
the doc(All old option names are marked with withDeprecatedKeys to ensure the 
compatibility.)

  was:
# Moving all classes aren't related to kubernetes to flink-autoscaler module
 # Support kubernetes autoscaler using generic interface
 # Removing the flink-kubernetes-operator-autoscaler module
 # Removing the option prefix(kubernetes.operator.) for all options and update 
the doc(All old option names are marked with withDeprecatedKeys to ensure the 
compatibility.)


> Support kubernetes autoscaler using generic interface
> -
>
> Key: FLINK-33098
> URL: https://issues.apache.org/jira/browse/FLINK-33098
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> # Moving all classes aren't related to kubernetes to flink-autoscaler module
>  # Support kubernetes autoscaler using generic interface (The most important 
> commit)
>  ## Migrate all generic classes from flink-kubernetes-operator-autoscaler 
> module to autoscaler moudle
>  ## Migrate all kubernetes implementation classes from 
> flink-kubernetes-operator-autoscaler module to flink-kubernetes-operator 
> moudle
>  ## Support all old tests
>  ## Removing the flink-kubernetes-operator-autoscaler module
>  # Removing the option prefix(kubernetes.operator.) for all options and 
> update the doc(All old option names are marked with withDeprecatedKeys to 
> ensure the compatibility.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 commented on pull request #94: [FLINK-33073][Connectors/AWS] Implement end-to-end tests for KinesisS…

2023-09-23 Thread via GitHub


hlteoh37 commented on PR #94:
URL: 
https://github.com/apache/flink-connector-aws/pull/94#issuecomment-1732336801

   Verified that running AWS end-to-end tests locally all pass
   
   ```
   FLINK_AWS_USER= 
FLINK_AWS_PASSWORK= mvn clean verify 
-Prun-aws-end-to-end-tests 
-DdistDir=~/workplace/flink_os/flink/flink-dist/target/flink-dist_2.12-1.18-SNAPSHOT.jar
 -pl flink-connector-aws-kinesis-streams-e2e-tests -Dflink.version=1.18-SNAPSHOT
   ```
   
   ```
   [INFO] --- maven-surefire-plugin:3.0.0-M5:test (end-to-end-tests) @ 
flink-connector-aws-kinesis-streams-e2e-tests ---
   [INFO] 
   [INFO] ---
   [INFO]  T E S T S
   [INFO] ---
   [INFO] Running 
org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase
   [INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 
72.61 s - in org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase
   [INFO] 
   [INFO] Results:
   [INFO] 
   [INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0
   [INFO] 
   [INFO] 

   [INFO] BUILD SUCCESS
   [INFO] 

   [INFO] Total time:  01:27 min
   [INFO] Finished at: 2023-09-23T15:55:17+01:00
   [INFO] 

   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27429) Implement Vertica JDBC Dialect

2023-09-23 Thread Jasmin Redzepovic (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768257#comment-17768257
 ] 

Jasmin Redzepovic commented on FLINK-27429:
---

Hi [~snuyanzin] , could you please find some time to review this PR 
[https://github.com/apache/flink-connector-jdbc/pull/55]? There are just a few 
lines of code. :)

Thank you in advance.

 

> Implement Vertica JDBC Dialect
> --
>
> Key: FLINK-27429
> URL: https://issues.apache.org/jira/browse/FLINK-27429
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: 1.16.0
>Reporter: Jasmin Redzepovic
>Assignee: Jasmin Redzepovic
>Priority: Minor
>  Labels: pull-request-available
>
> In order to use Vertica database as a JDBC source or sink, corresponding 
> dialect has to be implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30831) Improving e2e test output in case errors/exceptions are found

2023-09-23 Thread Flaviu Cicio (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768256#comment-17768256
 ] 

Flaviu Cicio commented on FLINK-30831:
--

[~mapohl] Can I take this ticket as my first one?

> Improving e2e test output in case errors/exceptions are found
> -
>
> Key: FLINK-30831
> URL: https://issues.apache.org/jira/browse/FLINK-30831
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-major, starter
>
> Some e2e tests parse the Flink logs for exceptions using {{grep}}. We then 
> print the first 500 lines of the each log file in case an exception that 
> shouldn't be ignored is found (see 
> [internal_check_logs_for_exceptions|https://github.com/apache/flink/blob/c9e87fe410c42f7e7c19c81456d4212a58564f5e/flink-end-to-end-tests/test-scripts/common.sh#L449]
>  or 
> [check_logs_for_errors|https://github.com/apache/flink/blob/c9e87fe410c42f7e7c19c81456d4212a58564f5e/flink-end-to-end-tests/test-scripts/common.sh#L387]).
>  Instead, we could use {{grep -C200}} to actually print the context of the 
> exception.
> This would help especially in those situations where the exception doesn't 
> appear in the first 500 lines.
> This issue does not necessarily only include the two aforementioned code 
> locations. One should check the scripts for other code with a similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30831) Improving e2e test output in case errors/exceptions are found

2023-09-23 Thread Flaviu Cicio (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768256#comment-17768256
 ] 

Flaviu Cicio edited comment on FLINK-30831 at 9/23/23 9:55 AM:
---

[~mapohl] Can I have this ticket as my first one?


was (Author: JIRAUSER302297):
[~mapohl] Can I take this ticket as my first one?

> Improving e2e test output in case errors/exceptions are found
> -
>
> Key: FLINK-30831
> URL: https://issues.apache.org/jira/browse/FLINK-30831
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-major, starter
>
> Some e2e tests parse the Flink logs for exceptions using {{grep}}. We then 
> print the first 500 lines of the each log file in case an exception that 
> shouldn't be ignored is found (see 
> [internal_check_logs_for_exceptions|https://github.com/apache/flink/blob/c9e87fe410c42f7e7c19c81456d4212a58564f5e/flink-end-to-end-tests/test-scripts/common.sh#L449]
>  or 
> [check_logs_for_errors|https://github.com/apache/flink/blob/c9e87fe410c42f7e7c19c81456d4212a58564f5e/flink-end-to-end-tests/test-scripts/common.sh#L387]).
>  Instead, we could use {{grep -C200}} to actually print the context of the 
> exception.
> This would help especially in those situations where the exception doesn't 
> appear in the first 500 lines.
> This issue does not necessarily only include the two aforementioned code 
> locations. One should check the scripts for other code with a similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27429) Implement Vertica JDBC Dialect

2023-09-23 Thread Jasmin Redzepovic (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jasmin Redzepovic updated FLINK-27429:
--
Labels: pull-request-available  (was: pull-request-available stale-assigned)

> Implement Vertica JDBC Dialect
> --
>
> Key: FLINK-27429
> URL: https://issues.apache.org/jira/browse/FLINK-27429
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: 1.16.0
>Reporter: Jasmin Redzepovic
>Assignee: Jasmin Redzepovic
>Priority: Minor
>  Labels: pull-request-available
>
> In order to use Vertica database as a JDBC source or sink, corresponding 
> dialect has to be implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)