[jira] [Commented] (FLINK-31856) Add support for Opensearch Connector REST client customization

2023-06-12 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-31856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731895#comment-17731895
 ] 

Michael Hempel-Jørgensen commented on FLINK-31856:
--

[~reta] we have tested the PR and it solves our use case so that is great (y)

The main improvement we could suggest is that we had to copy the whole call to 
builder.setHttpClientConfigCallback(), i.e. lines 45-75 in 
DefaultRestClientFactory in order to add a call to 
httpClientBuilder.addInterceptorFirst();

I'm not sure if anything can be done to alleviate that so we can somehow 
inherit the default behaviour, but in general the PR is still a great 
improvement.

> Add support for Opensearch Connector REST client customization
> --
>
> Key: FLINK-31856
> URL: https://issues.apache.org/jira/browse/FLINK-31856
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.0
>Reporter: Michael Hempel-Jørgensen
>Assignee: Andriy Redko
>Priority: Minor
>  Labels: pull-request-available
>
> It is not currently possible to customise the Opensearch REST client in all 
> of the connectors.
> We are currently using the using the OpensearchSink in 
> [connector/opensearch/sink/OpensearchSink.java|https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java]
>  and need to be able to authenticate/authorise with Opensearch using OAuth2 
> and therefore need to be able to pass a bearer token to the bulk index calls.
> The access token will expire and change during the jobs lifetime it must be 
> possible to handle this, i.e. giving a token at when building the sink is not 
> enough.
> For reference see the mailing list discussion: 
> https://lists.apache.org/thread/9rvwhzjwzm6yq9mg481sdxqx9nqr1x5g



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731876#comment-17731876
 ] 

Benchao Li commented on FLINK-32320:


[~aitozi] Then you need to open dedicated Calcite issue, and give clear 
definition and description of the problem in Calcite side, and move discussion 
there. Note that in Calcite, clear description of the problem is more important 
than the way to fix it.

> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] yuxiqian opened a new pull request, #32: [FLINK-32289][docs] Fix incorrect metadata column type in examples

2023-06-12 Thread via GitHub


yuxiqian opened a new pull request, #32:
URL: https://github.com/apache/flink-connector-kafka/pull/32

   This PR fixes incorrect metadata column type in Kafka connector examples.
   
   ## Brief change log
   
   Changed timestamp metadata column type from TIMESTAMP to TIMESTAMP_LTZ (with 
local timezone) in [Kafka SQL connector 
examples](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/#how-to-create-a-kafka-table)
 in both English and Chinese docs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Myasuka commented on a diff in pull request #22767: FLIP-296: Extend watermark-related features for SQL

2023-06-12 Thread via GitHub


Myasuka commented on code in PR #22767:
URL: https://github.com/apache/flink/pull/22767#discussion_r1227481717


##
docs/content.zh/docs/dev/table/concepts/time_attributes.md:
##
@@ -272,6 +272,99 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
 
 ```
 
+ 在SQL中使用watermark进阶功能
+之前的版本中,Watermark的很多进阶功能(比如watermark对齐)通过datastream 
api很容易使用,但想在sql中使用却不太容易,所以我们在1.18版本对这些功能进行了扩展,使用户也能够在sql中用到这些功能。
+
+{{< hint warning >}}
+**Note:** 只有实现了`SupportsWatermarkPushDown`接口的源连接器(source 
connector)(比如kafka、pulsar)才可以使用这些进阶功能。如果一个源连接器(source 
connector)没有实现`SupportsWatermarkPushDown`接口,但是任务配置了这些参数,任务可以正常运行,但是这些参数也不会生效。
+{{< /hint >}}
+
+# I. 配置Watermark发射方式
+Flink中watermark有两种发射方式:
+
+- on-periodic: 周期性发射
+- on-event: 每条事件数据发射一次watermark
+
+在DataStream API,用户可以通过WatermarkGenerator接口来决定选择哪种方式([自定义 
WatermarkGenerator]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#自定义-watermarkgenerator)),而对于sql任务,watermark默认是周期性发射的方式,默认周期是200ms,这个周期可以通过参数`pipeline.auto-watermark-interval`来进行修改。如果需要每条事件数据都发射一次watermark,可以在source表中进行如下配置:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.emit.strategy'='on-event',
+  ...
+);
+```
+
+当然,也可以使用`OPTIONS` hint来配置:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.emit.strategy'='on-periodic') */
+```
+
+# II. 配置数据源(Source)的空闲超时时间
+
+如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着`WatermarkGenerator`也不会获得任何新数据去生成watermark,我们称这类数据源为空闲输入或空闲源。在这种情况下,如果其他某些分区仍然在发送事件数据就会出现问题,因为下游算子watermark的计算方式是取所有上游并行数据源watermark的最小值,由于空闲的分片/分区没有计算新的watermark,任务的watermark将不会发生变化,如果配置了数据源的空闲超时时间,一个分区/分片在超时时间没有发送事件数据就会被标记为空闲,下游计算新的watermark的时候将会忽略这个空闲sourse,从而让watermark继续推进。
+
+在sql中可以通过`table.exec.source.idle-timeout`参数来定义一个全局的超时时间,每个数据源都会生效。但如果你想为每个数据源设置不同的空闲超时时间,可以直接在源表中进行设置:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+  ...
+  user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+  'scan.watermark.idle-timeout'='1min',
+  ...
+);
+```
+
+或者也可以使用`OPTIONS` hint:
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') 
*/
+```
+
+# III. Watermark对齐
+受到数据分布或者机器负载等各种因素的影响,同一个数据源的不同分区/分片之间可能出现消费速度不一样的情况,不同数据源之间的消费速度也可能不一样,假如下游有一些有状态的算子,这些算子可能需要在状态中缓存更多那些消费更快的数据,等待那些消费慢的数据,状态可能会变得很大;消费速率不一致也可能造成更严重的数据乱序情况,可能会影响窗口的计算准确度。这些场景都可以使用watermark对齐功能,确保源表的某个分片/分块/分区的watermark不会比其他分片/分块/分区增加太快,从而避免上述问题,需要注意的是watermark对齐功能会影响任务的性能,这取决于不同源表之间数据消费差别有多大。
+
+在sql任务中可以在源表中配置watermark对齐:
+
+```sql
+-- configure in table options
+CREATE TABLE user_actions (
+...
+user_action_time TIMESTAMP(3),
+  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
+) WITH (
+'scan.watermark.alignment.group'='alignment-group-1',
+'scan.watermark.alignment.max-drift'='1min',
+'scan.watermark.alignment.update-interval'='1s',
+...
+);
+```
+
+当然,你也依然可以用`OPTIONS` hint:
+
+```sql
+-- use 'OPTIONS' hint
+select ... from source_table /*+ 
OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 
'scan.watermark.alignment.max-drift'='1min', 
'scan.watermark.alignment.update-interval'='1s') */
+```
+
+这里有三个参数:
+
+- `scan.watermark.alignment.group`配置对齐组名称,在同一个组的数据源将会对齐
+- `scan.watermark.alignment.max-drift`配置分片/分块/分区允许偏离对齐时间的最大范围
+- `scan.watermark.alignment.update-interval`配置计算对齐时间的频率,非必需,默认是1s
+
+{{< hint warning >}}
+**Note:** 如果源连接器(source 
connector)未实现[FLIP-217](https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits),并且使用了watermark对齐的功能,那么任务运行会抛出异常,用户可以设置pipeline.watermark-arignment.Allow-naligned-Source-Splits=true来禁用源分片的WaterMark对齐功能,此时,只有当分片数量等于源并行度的时候,watermark对齐功能才能正常工作。

Review Comment:
   Wrong parameters, it should be 
`pipeline.watermark-alignment.allow-unaligned-source-splits`



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkParams.java:
##
@@ -26,27 +38,93 @@ public void setEmitStrategy(WatermarkEmitStrategy 
emitStrategy) {
 this.emitStrategy = emitStrategy;
 }
 
+public String getAlignGroupName() {
+return alignGroupName;
+}
+
+public void setAlignGroupName(String alignGroupName) {
+this.alignGroupName = alignGroupName;
+}
+
+public Duration getAlignMaxDrift() {
+return alignMaxDrift;
+}
+
+public void setAlignMaxDrift(Duration alignMaxDrift) {
+this.alignMaxDrift = alignMaxDrift;
+}
+
+public Duration getAlignUpdateInterval() {
+return alignUpdateInterval;
+}
+
+public void setAlignUpdateInterval(Duration alignUpdateInterval) {
+this.alignUpdateInterval = alignUpdateInterval;
+}
+
+public boolean alignWatermarkEnabled() {
+return !StringUtils.isNullOrWhitespaceOnly(alignGroupName)
+&& 

[GitHub] [flink] flinkbot commented on pull request #22768: [FLINK-32309][sql-gateway] Use independent resource manager for table environment

2023-06-12 Thread via GitHub


flinkbot commented on PR #22768:
URL: https://github.com/apache/flink/pull/22768#issuecomment-1588509273

   
   ## CI report:
   
   * f4d5c01b209db532ed82e2cd9d50941abe5dfb01 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32309) Shared classpaths and jars manager for jobs in sql gateway cause confliction

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32309:
---
Labels: pull-request-available  (was: )

> Shared classpaths and jars manager for jobs in sql gateway cause confliction
> 
>
> Key: FLINK-32309
> URL: https://issues.apache.org/jira/browse/FLINK-32309
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: FangYong
>Priority: Major
>  Labels: pull-request-available
>
> Current all jobs in the same session of sql gateway will share the resource 
> manager which provide the classpath for jobs. After a job is performed, it's 
> classpath and jars will be in the shared resource manager which are used by 
> the next jobs. It may cause too many unnecessary jars in a job or even cause 
> confliction 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22768: [FLINK-32309][sql-gateway] Use independent resource manager for table environment

2023-06-12 Thread via GitHub


FangYongs opened a new pull request, #22768:
URL: https://github.com/apache/flink/pull/22768

   
   ## What is the purpose of the change
   
   This PR aims to create new resource manager for table environment in sql 
gateway
   
   ## Brief change log
   
 - Create new resource manager which will share the classloader and local 
path with the previous manager
 - Create table environment for each operation in gateway with the new 
resource manager
 - Add jars and functions related operation in gateway which will be 
performed on the session resource manager.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Updated `SqlGatewayServiceITCase.testListUserDefinedFunctions` to test 
`create` functions.
 - Updated `SqlGatewayServiceITCase.testConfigureSessionWithLegalStatement` 
to check the jars are empty if a query not use udf
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32321) Temporal Join job missing condition after “ON”

2023-06-12 Thread macdoor615 (Jira)
macdoor615 created FLINK-32321:
--

 Summary: Temporal Join job missing condition after “ON”
 Key: FLINK-32321
 URL: https://issues.apache.org/jira/browse/FLINK-32321
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway
Affects Versions: 1.17.1
Reporter: macdoor615
 Fix For: 1.17.2


We have a SQL job, like this
{code:java}
select ... from prod_kafka.f_alarm_tag_dev
 /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ as f 
left join mysql_bnpmp.gem_bnpmp.f_a_alarm_filter
 /*+ OPTIONS('lookup.cache.max-rows' = '5000',
 'lookup.cache.ttl' = '30s') */
FOR SYSTEM_TIME AS OF f.proctime ff  on ff.rule_type = 0 and f.ne_ip = ff.ip 
{code}
We submit to flink 1.17.1 cluster with sql-gateway. We found job detail missing 
lookup condition (rule_type=0) 
{code:java}
  +- [1196]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
joinType=[LeftOuterJoin], lookup=[ip=ne_ip], select=[event_id, {code}
We submit same sql to flink 1.17.0 cluster with sql-gateway. There is 
(rule_type=0) lookup condition
{code:java}
  +- [3]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
joinType=[LeftOuterJoin], lookup=[rule_type=0, ip=ne_ip], where=[(rule_type = 
0)], select=[event_id, severity,{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] leonardBang commented on pull request #22760: [FLINK-32289][kafka] Fix incorrect metadata column type in docs

2023-06-12 Thread via GitHub


leonardBang commented on PR #22760:
URL: https://github.com/apache/flink/pull/22760#issuecomment-1588469379

   @yuxiqian Could you also open a PR for 
https://github.com/apache/flink-connector-kafka repo as we plan to move kafka 
connector there from flink main repo?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] leonardBang merged pull request #22760: [FLINK-32289][kafka] Fix incorrect metadata column type in docs

2023-06-12 Thread via GitHub


leonardBang merged PR #22760:
URL: https://github.com/apache/flink/pull/22760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731832#comment-17731832
 ] 

Aitozi commented on FLINK-32320:


[~libenchao] Thanks for your attention. I just made a quick fix on calcite side 
when creating a new correlationId. If in the same scope and same identifier, 
using the same correlationId as before. It works as expected. What do you think 
of this solution ? 

> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731830#comment-17731830
 ] 

Wencong Liu commented on FLINK-32319:
-

If this exception occurs, it indicates that the upstream vertex initializes its 
ResultPartition slower than the downstream vertex. This happens when the 
upstream vertex is short of CPU/Memory resources. We should give the 
LocalInputChannel more chances to request, so we can increase 
"taskmanager.network.request-backoff.max" to add the backoff from 10s to bigger 
value. [~1026688210] 

> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on yarn 
> flink configuration :
> ```
> $internal.application.program-argssql2
> $internal.deployment.config-dir   /data/home/flink/wgcn/flink-1.17.1/conf
> $internal.yarn.log-config-file
> /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties
> akka.ask.timeout  100s
> blob.server.port  15402
> classloader.check-leaked-classloader  false
> classloader.resolve-order parent-first
> env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 
> execution.attachedtrue
> execution.checkpointing.aligned-checkpoint-timeout10 min
> execution.checkpointing.externalized-checkpoint-retention 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval  10 min
> execution.checkpointing.min-pause 10 min
> execution.savepoint-restore-mode  NO_CLAIM
> execution.savepoint.ignore-unclaimed-statefalse
> execution.shutdown-on-attached-exit   false
> execution.target  embedded
> high-availability zookeeper
> high-availability.cluster-id  application_1684133071014_7202676
> high-availability.storageDir  hdfs:///user/flink/recovery
> high-availability.zookeeper.path.root /flink
> high-availability.zookeeper.quorumx
> internal.cluster.execution-mode   NORMAL
> internal.io.tmpdirs.use-local-default true
> io.tmp.dirs   
> /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676
> jobmanager.execution.failover-strategyregion
> jobmanager.memory.heap.size   9261023232b
> jobmanager.memory.jvm-metaspace.size  268435456b
> jobmanager.memory.jvm-overhead.max1073741824b
> jobmanager.memory.jvm-overhead.min1073741824b
> jobmanager.memory.off-heap.size   134217728b
> jobmanager.memory.process.size10240m
> jobmanager.rpc.address
> jobmanager.rpc.port   31332
> metrics.reporter.promgateway.deleteOnShutdown true
> metrics.reporter.promgateway.factory.class
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
> metrics.reporter.promgateway.hostUrl  :9091
> metrics.reporter.promgateway.interval 60 SECONDS
> metrics.reporter.promgateway.jobName  join_phase3_v7
> metrics.reporter.promgateway.randomJobNameSuffix  false
> parallelism.default   128
> pipeline.classpaths   
> pipeline.jars 
> file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar
> rest.address  
> rest.bind-address x
> rest.bind-port5-50500
> rest.flamegraph.enabled   true
> restart-strategy.failure-rate.delay   10 s
> restart-strategy.failure-rate.failure-rate-interval   1 min
> restart-strategy.failure-rate.max-failures-per-interval   6
> restart-strategy.type exponential-delay
> state.backend.typefilesystem
> state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn
> state.checkpoints.num-retained3
> taskmanager.memory.managed.fraction   0
> taskmanager.memory.network.max600mb
> taskmanager.memory.process.size   10240m
> 

[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731829#comment-17731829
 ] 

Benchao Li commented on FLINK-32320:


You are correct, this will miss some opportunity of reusing common table 
expressions, including the {{VIEW}} used multi times.

> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #22137: [FLINK-31233][Rest] Return 404 when task manager stdout/log files not exist.

2023-06-12 Thread via GitHub


KarmaGYZ commented on code in PR #22137:
URL: https://github.com/apache/flink/pull/22137#discussion_r1227428286


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java:
##
@@ -116,12 +85,11 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
 
 private TransientBlobKey transientBlobKey2;
 
-@BeforeClass
+@BeforeAll
 public static void setup() throws IOException, HandlerRequestException {

Review Comment:
   Why we need to keep the public modifier? Same as below.



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java:
##
@@ -151,34 +151,12 @@ protected CompletableFuture respondToRequest(
 },
 ctx.executor());
 
-return resultFuture.whenComplete(
+return resultFuture.handle(
 (Void ignored, Throwable throwable) -> {
 if (throwable != null) {
-log.error(
-"Failed to transfer file from TaskExecutor 
{}.",
-taskManagerId,
-throwable);
-fileBlobKeys.invalidate(taskManagerId);
-
-final Throwable strippedThrowable =
-
ExceptionUtils.stripCompletionException(throwable);
-
-if (strippedThrowable instanceof 
UnknownTaskExecutorException) {
-throw new CompletionException(
-new NotFoundException(
-String.format(
-"Failed to transfer file 
from TaskExecutor %s because it was unknown.",
-taskManagerId),
-strippedThrowable));
-} else {
-throw new CompletionException(
-new FlinkException(
-String.format(
-"Failed to transfer file 
from TaskExecutor %s.",
-taskManagerId),
-strippedThrowable));
-}
+handleException(ctx, httpRequest, throwable, 
taskManagerId);
 }
+return ignored;

Review Comment:
   Not sure why we need to return here?



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java:
##
@@ -46,11 +48,21 @@ class TestingChannelHandlerContext implements 
ChannelHandlerContext {
 
 final File outputFile;
 final ChannelPipeline pipeline = new TestingChannelPipeline();
+HttpResponse httpResponse;
+byte[] responseData;

Review Comment:
   Ditto. private modifier.



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java:
##
@@ -165,6 +168,81 @@ void testFileCacheExpiration() throws Exception {
 assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent2);
 }
 
+@Test
+void testStdoutFileHandlerHandleFileNotFoundException() throws Exception {
+final Time cacheEntryDuration = Time.milliseconds(1000L);
+TestingTaskManagerStdoutFileHandler 
testingTaskManagerStdoutFileHandler =
+createTestTaskManagerStdoutFileHandler(
+cacheEntryDuration, new FileNotFoundException("file 
not found"));
+final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath());
+final TestingChannelHandlerContext testingContext =
+new TestingChannelHandlerContext(outputFile);
+
+CompletableFuture handleFuture =
+testingTaskManagerStdoutFileHandler.respondToRequest(
+testingContext, HTTP_REQUEST, handlerRequest, null);
+assertThat(handleFuture).isCompleted();
+assertThat(testingContext.getHttpResponse())
+.isNotNull()
+.satisfies(
+httpResponse ->
+
assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.OK));
+assertThat(testingContext.getResponseData())
+.isNotNull()
+.satisfies(
+data ->
+assertThat(new String(data, "UTF-8"))
+.isEqualTo(
+
TaskManagerStdoutFileHandler.FILE_NOT_FOUND_INFO));
+}
+
+@Test
+void testStdoutFileHandlerHandleOtherException() throws Exception {
+

[GitHub] [flink] flinkbot commented on pull request #22767: FLIP-296: Extend watermark-related features for SQL

2023-06-12 Thread via GitHub


flinkbot commented on PR #22767:
URL: https://github.com/apache/flink/pull/22767#issuecomment-1588429310

   
   ## CI report:
   
   * 26d4747481a26ae4b192b7744e48a6757ddb2391 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse

2023-06-12 Thread via GitHub


WenDing-Y commented on PR #49:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1588427649

   @MartijnVisser the pr is ready


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuchengxin opened a new pull request, #22767: Feature/watermark params

2023-06-12 Thread via GitHub


yuchengxin opened a new pull request, #22767:
URL: https://github.com/apache/flink/pull/22767

   
   
   ## What is the purpose of the change
   
   The PR proposes to extend watermark-related features in SQL layer, these 
fetures have been implemented on the datastream API, but not available in sql 
yet.
   The extening features as follows:
 
 1. configurable watermark emit strategy in sql
 2. dealing with idle sources in sql
 3. configurable watermark alignment in sql
   
   
   ## Brief change log
   
   - Added `org.apache.flink.table.watermark.WatermarkParams` to express the 
parameters of extended watermark-related features
   - Added and validated releated params in 
`org.apache.flink.table.factories.FactoryUtil`
   - Parse out the watermark-related configs from hint or table options in 
`org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleBase`
   - Handle watermark-related features in 
`org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added test cases in 
`org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest`
 to test use these extended features with sql hint or tabe options
   - The serialization and deserialization of `DynamicTableSourceSpec` is 
covered by existing tests: 
`org.apache.flink.table.planner.plan.nodes.exec.serde.DynamicTableSourceSpecSerdeTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) yes
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) yes
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread wgcn (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731827#comment-17731827
 ] 

wgcn commented on FLINK-32319:
--

hi~ [~Wencong Liu]  thanks for your response,  I will try the config, I have a 
question. I just roughly looked at the meaning of this config. Why does this 
config need to be set so large? Is it related to parallel reading?  We are 
using version 1.12 of Flink in our production environment, and I have never 
paid attention to this config before. Is this because some mechanisms were 
added after version 1.12 

> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on yarn 
> flink configuration :
> ```
> $internal.application.program-argssql2
> $internal.deployment.config-dir   /data/home/flink/wgcn/flink-1.17.1/conf
> $internal.yarn.log-config-file
> /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties
> akka.ask.timeout  100s
> blob.server.port  15402
> classloader.check-leaked-classloader  false
> classloader.resolve-order parent-first
> env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 
> execution.attachedtrue
> execution.checkpointing.aligned-checkpoint-timeout10 min
> execution.checkpointing.externalized-checkpoint-retention 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval  10 min
> execution.checkpointing.min-pause 10 min
> execution.savepoint-restore-mode  NO_CLAIM
> execution.savepoint.ignore-unclaimed-statefalse
> execution.shutdown-on-attached-exit   false
> execution.target  embedded
> high-availability zookeeper
> high-availability.cluster-id  application_1684133071014_7202676
> high-availability.storageDir  hdfs:///user/flink/recovery
> high-availability.zookeeper.path.root /flink
> high-availability.zookeeper.quorumx
> internal.cluster.execution-mode   NORMAL
> internal.io.tmpdirs.use-local-default true
> io.tmp.dirs   
> /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676
> jobmanager.execution.failover-strategyregion
> jobmanager.memory.heap.size   9261023232b
> jobmanager.memory.jvm-metaspace.size  268435456b
> jobmanager.memory.jvm-overhead.max1073741824b
> jobmanager.memory.jvm-overhead.min1073741824b
> jobmanager.memory.off-heap.size   134217728b
> jobmanager.memory.process.size10240m
> jobmanager.rpc.address
> jobmanager.rpc.port   31332
> metrics.reporter.promgateway.deleteOnShutdown true
> metrics.reporter.promgateway.factory.class
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
> metrics.reporter.promgateway.hostUrl  :9091
> metrics.reporter.promgateway.interval 60 SECONDS
> metrics.reporter.promgateway.jobName  join_phase3_v7
> metrics.reporter.promgateway.randomJobNameSuffix  false
> parallelism.default   128
> pipeline.classpaths   
> pipeline.jars 
> file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar
> rest.address  
> rest.bind-address x
> rest.bind-port5-50500
> rest.flamegraph.enabled   true
> restart-strategy.failure-rate.delay   10 s
> restart-strategy.failure-rate.failure-rate-interval   1 min
> restart-strategy.failure-rate.max-failures-per-interval   6
> restart-strategy.type exponential-delay
> state.backend.typefilesystem
> state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn
> state.checkpoints.num-retained3
> taskmanager.memory.managed.fraction   0
> taskmanager.memory.network.max600mb
> taskmanager.memory.process.size   10240m
> 

[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

And we also found there is another problem that announceCombinedWatermark may 
throw a exception (like  "subtask 25 is not ready yet to receive events" , this 
subtask maybe under failover) lead the period task not running any more 
(ThreadPoolExecutor will not schedule the period task if it throw a exception), 
i think we should increase the robustness of announceCombinedWatermark function 
to cover this case (see 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
 )

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  
> And we also found there is another problem that announceCombinedWatermark may 
> throw a exception (like  "subtask 25 is not ready yet to receive events" , 
> this subtask maybe under failover) lead the period task not running any more 
> (ThreadPoolExecutor will not schedule the period task if it throw a 
> exception), i think we should increase the robustness of 
> announceCombinedWatermark 

[jira] [Commented] (FLINK-28076) StreamFaultToleranceTestBase runs into timeout

2023-06-12 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731828#comment-17731828
 ] 

Rui Fan commented on FLINK-28076:
-

{quote}[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49865=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=17896]
{quote}
Thanks [~Sergey Nuyanzin] for reporting this. This log failed to upload, not 
sure why it timed out.

 

> StreamFaultToleranceTestBase runs into timeout
> --
>
> Key: FLINK-28076
> URL: https://issues.apache.org/jira/browse/FLINK-28076
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [Build 
> #36259|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36259=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=12054]
>  got stuck in a {{StreamFaultToleranceTestBase}} subclass:
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7eff8c00b800 nid=0xf1a waiting on 
> condition [0x7eff92eca000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x82213498> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:92)
>   at 
> org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram(StreamFaultToleranceTestBase.java:136)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> [...]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731823#comment-17731823
 ] 

Aitozi commented on FLINK-32320:


In production, multi sink job are very common, if the table from UDTF is 
queried multi times, it will cause the function to be executed multi times(as 
shown in the plan). This will lead to bad performance. 

After some research, I think it's should be caused by: During sqlToRel process, 
the table function sqlNode will be `toRel` multi times and leads to different 
correlationId.


> Same correlate can not be reused due to the different correlationId
> ---
>
> Key: FLINK-32320
> URL: https://issues.apache.org/jira/browse/FLINK-32320
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
>   """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>   """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>   +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)
Aitozi created FLINK-32320:
--

 Summary: Same correlate can not be reused due to the different 
correlationId
 Key: FLINK-32320
 URL: https://issues.apache.org/jira/browse/FLINK-32320
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


As describe in SubplanReuserTest


{code:java}
  @Test
  def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
  """
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
  """.stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyExecPlan(sqlQuery)
  }
{code}

This will produce the plan 


{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
   +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}

The Correlate node can not be reused due to the different correlation id.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731822#comment-17731822
 ] 

Wencong Liu commented on FLINK-32319:
-

Hi [~1026688210] , maybe you could try this config 
"taskmanager.network.request-backoff.max: 2" because its default value is 
1.

> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on yarn 
> flink configuration :
> ```
> $internal.application.program-argssql2
> $internal.deployment.config-dir   /data/home/flink/wgcn/flink-1.17.1/conf
> $internal.yarn.log-config-file
> /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties
> akka.ask.timeout  100s
> blob.server.port  15402
> classloader.check-leaked-classloader  false
> classloader.resolve-order parent-first
> env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 
> execution.attachedtrue
> execution.checkpointing.aligned-checkpoint-timeout10 min
> execution.checkpointing.externalized-checkpoint-retention 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval  10 min
> execution.checkpointing.min-pause 10 min
> execution.savepoint-restore-mode  NO_CLAIM
> execution.savepoint.ignore-unclaimed-statefalse
> execution.shutdown-on-attached-exit   false
> execution.target  embedded
> high-availability zookeeper
> high-availability.cluster-id  application_1684133071014_7202676
> high-availability.storageDir  hdfs:///user/flink/recovery
> high-availability.zookeeper.path.root /flink
> high-availability.zookeeper.quorumx
> internal.cluster.execution-mode   NORMAL
> internal.io.tmpdirs.use-local-default true
> io.tmp.dirs   
> /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676
> jobmanager.execution.failover-strategyregion
> jobmanager.memory.heap.size   9261023232b
> jobmanager.memory.jvm-metaspace.size  268435456b
> jobmanager.memory.jvm-overhead.max1073741824b
> jobmanager.memory.jvm-overhead.min1073741824b
> jobmanager.memory.off-heap.size   134217728b
> jobmanager.memory.process.size10240m
> jobmanager.rpc.address
> jobmanager.rpc.port   31332
> metrics.reporter.promgateway.deleteOnShutdown true
> metrics.reporter.promgateway.factory.class
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
> metrics.reporter.promgateway.hostUrl  :9091
> metrics.reporter.promgateway.interval 60 SECONDS
> metrics.reporter.promgateway.jobName  join_phase3_v7
> metrics.reporter.promgateway.randomJobNameSuffix  false
> parallelism.default   128
> pipeline.classpaths   
> pipeline.jars 
> file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar
> rest.address  
> rest.bind-address x
> rest.bind-port5-50500
> rest.flamegraph.enabled   true
> restart-strategy.failure-rate.delay   10 s
> restart-strategy.failure-rate.failure-rate-interval   1 min
> restart-strategy.failure-rate.max-failures-per-interval   6
> restart-strategy.type exponential-delay
> state.backend.typefilesystem
> state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn
> state.checkpoints.num-retained3
> taskmanager.memory.managed.fraction   0
> taskmanager.memory.network.max600mb
> taskmanager.memory.process.size   10240m
> taskmanager.memory.segment-size   128kb
> taskmanager.network.memory.buffers-per-channel8
> taskmanager.network.memory.floating-buffers-per-gate  800
> taskmanager.numberOfTaskSlots 2
> web.port  0
> web.tmpdir

[jira] [Commented] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Natea Eshetu Beshada (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731817#comment-17731817
 ] 

Natea Eshetu Beshada commented on FLINK-32318:
--

Hi [~luismacosta], I believe you need to update your dockerfile to include 
something like this 
{code:java}
COPY --from=build 
/app/flink-kubernetes-plugins/target/dependency/flink-s3-fs-presto* 
/opt/flink/plugins/s3/ {code}
 

> [flink-operator] missing s3 plugin in folder plugins
> 
>
> Key: FLINK-32318
> URL: https://issues.apache.org/jira/browse/FLINK-32318
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Luís Costa
>Priority: Minor
>
> Greetings,
> I'm trying to configure [Flink's Kubernetes HA 
> services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
>  for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
> find a file system implementation for scheme 's3'. The scheme is directly 
> supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
> flink-s3-fs-presto"_
> {code:java}
> 2023-06-12 10:05:16,981 INFO  akka.remote.Remoting
>  [] - Starting remoting
> 2023-06-12 10:05:17,194 INFO  akka.remote.Remoting
>  [] - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.4.125.209:6123]
> 2023-06-12 10:05:17,377 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor 
> system started at akka.tcp://flink@10.4.125.209:6123
> 2023-06-12 10:05:18,175 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
> KubernetesApplicationClusterEntrypoint down with application status FAILED. 
> Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
> services from the instantiated HighAvailabilityServicesFactory 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
>   at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
>   at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
>   at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
>   at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
>   at 
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
> Caused by: java.io.IOException: Could not create FileSystem for highly 
> available storage path 
> (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
>   at 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
>   at 
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
>   ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> Could not find a file system implementation for scheme 's3'. The scheme is 
> directly supported by Flink through the following plugin(s): 
> flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin 
> resides within its own subfolder within the plugins directory. See 
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
>  for more information. If you want to use a Hadoop file system for that 
> scheme, please add the scheme to the configuration 
> fs.allowed-fallback-filesystems. For a full list of supported file systems, 
> please see 

[jira] [Updated] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread wgcn (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-32319:
-
Environment: 
centos 7.
jdk 8.
flink1.17.1 application mode on yarn 

flink configuration :

```

$internal.application.program-args  sql2
$internal.deployment.config-dir /data/home/flink/wgcn/flink-1.17.1/conf
$internal.yarn.log-config-file  
/data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties
akka.ask.timeout100s
blob.server.port15402
classloader.check-leaked-classloaderfalse
classloader.resolve-order   parent-first
env.java.opts.taskmanager   -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 
execution.attached  true
execution.checkpointing.aligned-checkpoint-timeout  10 min
execution.checkpointing.externalized-checkpoint-retention   
RETAIN_ON_CANCELLATION
execution.checkpointing.interval10 min
execution.checkpointing.min-pause   10 min
execution.savepoint-restore-modeNO_CLAIM
execution.savepoint.ignore-unclaimed-state  false
execution.shutdown-on-attached-exit false
execution.targetembedded
high-availability   zookeeper
high-availability.cluster-idapplication_1684133071014_7202676
high-availability.storageDirhdfs:///user/flink/recovery
high-availability.zookeeper.path.root   /flink
high-availability.zookeeper.quorum  x
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default   true
io.tmp.dirs 
/data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676
jobmanager.execution.failover-strategy  region
jobmanager.memory.heap.size 9261023232b
jobmanager.memory.jvm-metaspace.size268435456b
jobmanager.memory.jvm-overhead.max  1073741824b
jobmanager.memory.jvm-overhead.min  1073741824b
jobmanager.memory.off-heap.size 134217728b
jobmanager.memory.process.size  10240m
jobmanager.rpc.address  
jobmanager.rpc.port 31332
metrics.reporter.promgateway.deleteOnShutdown   true
metrics.reporter.promgateway.factory.class  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl:9091
metrics.reporter.promgateway.interval   60 SECONDS
metrics.reporter.promgateway.jobNamejoin_phase3_v7
metrics.reporter.promgateway.randomJobNameSuffixfalse
parallelism.default 128
pipeline.classpaths 
pipeline.jars   
file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar
rest.address
rest.bind-address   x
rest.bind-port  5-50500
rest.flamegraph.enabled true
restart-strategy.failure-rate.delay 10 s
restart-strategy.failure-rate.failure-rate-interval 1 min
restart-strategy.failure-rate.max-failures-per-interval 6
restart-strategy.type   exponential-delay
state.backend.type  filesystem
state.checkpoints.dir   hdfs://xx/user/flink/checkpoints-data/wgcn
state.checkpoints.num-retained  3
taskmanager.memory.managed.fraction 0
taskmanager.memory.network.max  600mb
taskmanager.memory.process.size 10240m
taskmanager.memory.segment-size 128kb
taskmanager.network.memory.buffers-per-channel  8
taskmanager.network.memory.floating-buffers-per-gate800
taskmanager.numberOfTaskSlots   2
web.port0
web.tmpdir  /tmp/flink-web-1b87445e-2761-4f16-97a1-8d4fc6fa8534
yarn.application-attempt-failures-validity-interval 6
yarn.application-attempts   3
yarn.application.name   join_phase3_v7
yarn.heartbeat.container-request-interval   700
```

  was:
centos 7.
jdk 8.
flink1.17.1 application mode on yarn 


> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on 

[jira] [Updated] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread wgcn (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wgcn updated FLINK-32319:
-
   Attachment: image-2023-06-13-07-14-48-958.png
  Component/s: Runtime / Network
Affects Version/s: 1.17.1
  Description: 
flink can't the partition of network after restart, lead that job can not 
restoring
 !image-2023-06-13-07-14-48-958.png! 
  Environment: 
centos 7.
jdk 8.
flink1.17.1 application mode on yarn 

> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on yarn 
>Reporter: wgcn
>Priority: Major
> Attachments: image-2023-06-13-07-14-48-958.png
>
>
> flink can't the partition of network after restart, lead that job can not 
> restoring
>  !image-2023-06-13-07-14-48-958.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread wgcn (Jira)
wgcn created FLINK-32319:


 Summary: flink can't the partition of network after restart
 Key: FLINK-32319
 URL: https://issues.apache.org/jira/browse/FLINK-32319
 Project: Flink
  Issue Type: Bug
Reporter: wgcn






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] architgyl commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-06-12 Thread via GitHub


architgyl commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1227304307


##
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:
##
@@ -60,6 +60,12 @@
 class YARNSessionFIFOITCase extends YarnTestBase {
 private static final Logger log = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
 
+protected static final String VIEW_ACLS = "user group";
+protected static final String MODIFY_ACLS = "admin groupAdmin";
+protected static final String VIEW_ACLS_WITH_WILDCARD = "user,* group";
+protected static final String MODIFY_ACLS_WITH_WILDCARD = "admin,* 
groupAdmin";

Review Comment:
   @becketqin addressed the comments. * makes sense rather than 
`user1,user2,*`. Can you please review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-training] ness-marichamyramachandran opened a new pull request, #62: FLK-1036 - Flink Level 1 - Module 1 Lab Exercises

2023-06-12 Thread via GitHub


ness-marichamyramachandran opened a new pull request, #62:
URL: https://github.com/apache/flink-training/pull/62

   RideCleansingExercise
   - Changes made to filter the rides only in New York City area.
 
   RidesAndFaresExercise
   - Below Changes have been made:
 1. Added NYCFilter to filter the rides only from New York City 
area.
 2. Implemented EnrichmentFunction to maintain the state of the 
rides to get its corresponding fare. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-training] ness-marichamyramachandran closed pull request #62: FLK-1036 - Flink Level 1 - Module 1 Lab Exercises

2023-06-12 Thread via GitHub


ness-marichamyramachandran closed pull request #62: FLK-1036 - Flink Level 1 - 
Module 1 Lab Exercises
URL: https://github.com/apache/flink-training/pull/62


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31990) Use Flink Configuration to specify KDS Source configuration object

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31990:
---
Labels: pull-request-available  (was: )

> Use Flink Configuration to specify KDS Source configuration object
> --
>
> Key: FLINK-31990
> URL: https://issues.apache.org/jira/browse/FLINK-31990
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
>
> *What*
> Use the Flink Configuration object to standardise the method of specifying 
> configurations for the KDS source. 
>  
> Also include validations:
> - Check that region in config matches ARN. ARN should take priority.
>  
> *Why*
> We want to standardise error messages + source serialization methods 
> implemented by Flink on the Flink Configuration objects.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #77: [FLINK-31990][Connectors/Kinesis] Use Configuration object instead of…

2023-06-12 Thread via GitHub


hlteoh37 opened a new pull request, #77:
URL: https://github.com/apache/flink-connector-aws/pull/77

   … properties in KDS Source
   
   ## Purpose of the change
   
   Use Configuration object for specifying Source config instead of Properties.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl closed pull request #22766: Yarn acl

2023-06-12 Thread via GitHub


architgyl closed pull request #22766: Yarn acl
URL: https://github.com/apache/flink/pull/22766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl opened a new pull request, #22766: Yarn acl

2023-06-12 Thread via GitHub


architgyl opened a new pull request, #22766:
URL: https://github.com/apache/flink/pull/22766

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl closed pull request #22765: Yarn acl

2023-06-12 Thread via GitHub


architgyl closed pull request #22765: Yarn acl
URL: https://github.com/apache/flink/pull/22765


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl opened a new pull request, #22765: Yarn acl

2023-06-12 Thread via GitHub


architgyl opened a new pull request, #22765:
URL: https://github.com/apache/flink/pull/22765

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support

2023-06-12 Thread via GitHub


pvary commented on code in PR #22694:
URL: https://github.com/apache/flink/pull/22694#discussion_r1227217764


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Optional;
+
+/** Delegation token provider for Hive. */
+@Internal
+public class HiveDelegationTokenProvider implements DelegationTokenProvider {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(HiveDelegationTokenProvider.class);
+
+org.apache.hadoop.conf.Configuration hiveConf;
+
+private KerberosLoginProvider kerberosLoginProvider;
+
+private static final Text TOKEN_ALIAS = new 
Text("hive.server2.delegation.token");
+
+@Override
+public String serviceName() {
+return "HiveServer2";
+}
+
+@Override
+public void init(Configuration configuration) throws Exception {
+hiveConf = getHiveConfiguration(configuration);
+kerberosLoginProvider = new KerberosLoginProvider(configuration);
+}
+
+private org.apache.hadoop.conf.Configuration 
getHiveConfiguration(Configuration conf) {
+try {
+org.apache.hadoop.conf.Configuration hadoopConf =
+HadoopUtils.getHadoopConfiguration(conf);
+hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+} catch (Exception | NoClassDefFoundError e) {
+LOG.warn("Fail to create Hive Configuration", e);
+}
+return hiveConf;
+}
+
+@Override
+public boolean delegationTokensRequired() throws Exception {
+/**
+ * The general rule how a provider/receiver must behave is the 
following: The provider and
+ * the receiver must be added to the classpath together with all the 
additionally required
+ * dependencies.
+ *
+ * This null check is required because the Hive provider is always 
on classpath but Hive
+ * jars are optional. Such case configuration is not able to be 
loaded. This construct is
+ * intended to be removed when Hive provider/receiver pair can be 
externalized (namely if a
+ * provider/receiver throws an exception then workload must be 
stopped).
+ */
+if (hiveConf == null) {
+LOG.debug(
+"Hive is not available (not packaged with this 
application), hence no "
++ "tokens will be acquired.");
+return false;
+}
+try {
+if 
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) 
{
+return false;
+}
+} catch (IOException e) {
+LOG.debug("Hadoop Kerberos is not enabled.");
+return false;
+}
+return !hiveConf.getTrimmed("hive.metastore.uris", "").isEmpty()
+&& kerberosLoginProvider.isLoginPossible(false);
+}
+
+@Override
+public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+UserGroupInformation freshUGI = 
kerberosLoginProvider.doLoginAndReturnUGI();
+

[GitHub] [flink-connector-jdbc] MartijnVisser merged pull request #57: [BP-3.1][FLINK-30790][Connector/JDBC] Add DatabaseExtension with TableManaged or testing.

2023-06-12 Thread via GitHub


MartijnVisser merged PR #57:
URL: https://github.com/apache/flink-connector-jdbc/pull/57


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] snuyanzin opened a new pull request, #57: [BP-3.1][FLINK-30790][Connector/JDBC] Add DatabaseExtension with TableManaged or testing.

2023-06-12 Thread via GitHub


snuyanzin opened a new pull request, #57:
URL: https://github.com/apache/flink-connector-jdbc/pull/57

   This is a cherry-pick backport of 
https://github.com/apache/flink-connector-jdbc/pull/22 to 3.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc

2023-06-12 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32313.
--
Fix Version/s: jdbc-3.2.0
 Assignee: Sergey Nuyanzin
   Resolution: Fixed

Fixed in main: 28016fdec16621e37b0e42322b1a542ca79343f8

> CrateDB relies on flink-shaded in flink-connector-jdbc
> --
>
> Key: FLINK-32313
> URL: https://issues.apache.org/jira/browse/FLINK-32313
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0
>
>
> See 
> https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27
>  - JDBC shouldn't rely on flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level

2023-06-12 Thread via GitHub


boring-cyborg[bot] commented on PR #56:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/56#issuecomment-1587890117

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] MartijnVisser merged pull request #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level

2023-06-12 Thread via GitHub


MartijnVisser merged PR #56:
URL: https://github.com/apache/flink-connector-jdbc/pull/56


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-12 Thread via GitHub


hanyuzheng7 commented on PR #22717:
URL: https://github.com/apache/flink/pull/22717#issuecomment-1587886884

   @snuyanzin @bvarghese1 
   For 2. different types and case 3.wrong(incompatible) types, if array exists 
null elements, I don't find a effective way to solve this problem because you 
can see array[array[1,2]], array[array[1, null]],
   their datatype are ARRAY[ARRAY NOT NULL] NOT NULL, ARRAY[ARRAY ] 
NOT NULL. It obviously they do not belong to same dataType, so we cannot concat 
them.
   As before, they can be concat  because  that I didn't check whether their 
dataType are same.
   But once you check it you will find array[array[1,2]], array[array[1, null]] 
belong to different dataType,
   so the res of concat(array[array[1,2], array[array[1, null]) should not 
[array[1,2], array[1, null]]. It should be null.
   But now, it can check the situation like array_concat(ARRAY['2'], ARRAY[1]), 
the res is null. Because their Datatype are
   different.
   
   But if you want to get this situation concat(array[array[1,2], 
array[array[1, null]) res: [array[1,2], array[1, null]], it become more 
difficult, because you want to remove all The NULL exist in the dataType. I 
have to use nullable way to remove all the NULL in their DataType. up to now, I 
can only find a way to remove outermost NULL, such as change ARRAY[ARRAY 
NOT NULL] NOT NULL -> ARRAY[ARRAY NOT NULL], but I cannot find a way to 
remove insider NULL in this version Flink. Because at the worst case if array 
like this -> array[array[array[array[array[array[1,2]], up to now I 
cannot find a way remove insider NULL.
   
   So now I have two way to solve 2. different types and case 
3.wrong(incompatible).
   The first way is that do not allow any null exist in the any array. 
   The second way is to strictly specify the input type,
   such as array[1,2]], array[array[1, null] they belong to different dataType 
so the res should be null.
   The newest version use the second way. Do you think is ok for you? 
   Thank you.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31980) Implement support for EFO in new Source

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31980:
---
Labels: pull-request-available  (was: )

> Implement support for EFO in new Source
> ---
>
> Key: FLINK-31980
> URL: https://issues.apache.org/jira/browse/FLINK-31980
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
>
> Implement support for reading from Kinesis Stream using Enhanced Fan Out 
> mechanism



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #76: [FLINK-31980] Implement support for EFO in Kinesis consumer

2023-06-12 Thread via GitHub


hlteoh37 opened a new pull request, #76:
URL: https://github.com/apache/flink-connector-aws/pull/76

   ## Purpose of the change
   
   Implement support for EFO in the new KDS source
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - Added unit tests
   - Manually verified by running the Kinesis connector on a local Flink 
cluster.
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27

2023-06-12 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731720#comment-17731720
 ] 

Danny Cranmer commented on FLINK-31813:
---

Merged commit 
[{{43e1295}}|https://github.com/apache/flink-connector-aws/commit/43e1295e402ed59e6db3689f6bd16b69e6f4e12e]
 into apache:main 

> Initial implementation of Kinesis Source using FLIP-27
> --
>
> Key: FLINK-31813
> URL: https://issues.apache.org/jira/browse/FLINK-31813
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
> Fix For: aws-connector-4.2.0
>
>
> Implement a base implementation of the Kinesis source based on FLIP-27
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27

2023-06-12 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-31813:
-

Assignee: Hong Liang Teoh

> Initial implementation of Kinesis Source using FLIP-27
> --
>
> Key: FLINK-31813
> URL: https://issues.apache.org/jira/browse/FLINK-31813
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
> Fix For: aws-connector-4.2.0
>
>
> Implement a base implementation of the Kinesis source based on FLIP-27
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27

2023-06-12 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-31813.
---
Resolution: Done

> Initial implementation of Kinesis Source using FLIP-27
> --
>
> Key: FLINK-31813
> URL: https://issues.apache.org/jira/browse/FLINK-31813
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Assignee: Hong Liang Teoh
>Priority: Major
> Fix For: aws-connector-4.2.0
>
>
> Implement a base implementation of the Kinesis source based on FLIP-27
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] dannycranmer merged pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-06-12 Thread via GitHub


dannycranmer merged PR #49:
URL: https://github.com/apache/flink-connector-aws/pull/49


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-06-12 Thread via GitHub


dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1227015927


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner;
+import 
org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Builder to construct the {@link KinesisStreamsSource}.
+ *
+ * The following example shows the minimum setup to create a {@link 
KinesisStreamsSource} that
+ * reads String values from a Kinesis Data Streams stream with ARN of
+ * arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name.
+ *
+ * {@code
+ * KinesisStreamsSource kdsSource =
+ * KinesisStreamsSource.builder()
+ * 
.setStreamArn("arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name")
+ * .setDeserializationSchema(new SimpleStringSchema())

Review Comment:
   I think properties (`consumerConfig`) are required too 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22743: [FLINK-26515][test] Add exception handling for RetryingExecutorTest#testDiscardOnTimeout

2023-06-12 Thread via GitHub


reswqa commented on code in PR #22743:
URL: https://github.com/apache/flink/pull/22743#discussion_r1227024446


##
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java:
##
@@ -118,6 +123,7 @@ public void handleFailure(Throwable throwable) {}
 Thread.sleep(10);
 }
 }
+assertThat(unexpectedException.get()).isNull();

Review Comment:
   nit:
   ```suggestion
   assertThat(unexpectedException).hasValue(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luís Costa updated FLINK-32318:
---
Description: 
Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_
{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}
Looking into the job container, can see that s3 plugins are in folder 
_/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned 
[here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/]
{code:java}
root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/

[jira] [Closed] (FLINK-32213) Add get off heap buffer in memory segment

2023-06-12 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-32213.
--
Fix Version/s: 1.18.0
   Resolution: Done

master(1.18) via 81cab8f486997ef666128cce4903c24d44ac7534.

> Add get off heap buffer in memory segment
> -
>
> Key: FLINK-32213
> URL: https://issues.apache.org/jira/browse/FLINK-32213
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> When flink job writes data to data lake such as paimon, iceberg and hudi, the 
> sink will write data to writer buffer first, then flush the data to file 
> system. To manage the writer buffer better, we'd like to allocate segment 
> from managed memory in flink and get off heap buffer to create writer buffer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luís Costa updated FLINK-32318:
---
Description: 
Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_
{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}
Looking into the job container, can see that s3 plugins are in folder 
_/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned 
[here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/]
{code:java}
root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/

[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luís Costa updated FLINK-32318:
---
Description: 
Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_
{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}


Looking into the job container, can see that s3 plugins are in folder 
_/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned 
[here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/]


{code:java}
root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/

[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luís Costa updated FLINK-32318:
---
Description: 
Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_
{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}
Looking into the container, can see that s3 plugins are in folder 
_/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned 
[here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/]
{code:java}
root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/

[jira] [Created] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira
Luís Costa created FLINK-32318:
--

 Summary: [flink-operator] missing s3 plugin in folder plugins
 Key: FLINK-32318
 URL: https://issues.apache.org/jira/browse/FLINK-32318
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Luís Costa


Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_


{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}

Looking into the container, can see that s3 plugins are in folder /opt/flink/ 
instead of s3/plugins as mentioned 

[jira] [Updated] (FLINK-32317) Enrich metadata in CR error field

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32317:
---
Labels: pull-request-available  (was: )

> Enrich metadata in CR error field
> -
>
> Key: FLINK-32317
> URL: https://issues.apache.org/jira/browse/FLINK-32317
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: Daren Wong
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.6.0
>
>
> CR Error field is improved in 
> https://issues.apache.org/jira/browse/FLINK-29708.
> The error field is more structured with exception type, stackTrace, 
> additionalMetadata, etc.
>  
> This ticket is a proposal to expose a config 
> ("kubernetes.operator.exception.metadata.mapper") to enrich the 
> additionalMetadata further.
>  
> The config consists of key-value pairs, for example:
> {code:java}
> kubernetes.operator.exception.metadata.mapper: IOException:Found 
> IOException,403:Found 403 error code{code}
> The key is a REGEX string that will be used to match against the whole stack 
> trace and if found, the value will be added to additionalMetadata. For 
> example:
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> 
>   name: basic-session-job-example
>   namespace: default
>   resourceVersion: "70206149"
>   uid: 916ea8f5-0821-4839-9953-2db9678c3fc9
> spec:
>   deploymentName: basic-session-deployment-example
>   job:
> args: []
> jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar
> parallelism: 4
> state: running
> upgradeMode: stateless
> status:
>   error: 
> '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException:
> Server returned HTTP response code: 403 for URL: 
> https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
> 403 error code","Found 
> IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server
> returned HTTP response code: 403 for URL: 
> https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
> 403 error code"]}}]}'
> ...
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] darenwkt opened a new pull request, #620: [FLINK-32317] Enrich metadata in CR error field

2023-06-12 Thread via GitHub


darenwkt opened a new pull request, #620:
URL: https://github.com/apache/flink-kubernetes-operator/pull/620

   
   
   ## What is the purpose of the change
   
   
   
   CR Error field is improved in 
https://issues.apache.org/jira/browse/FLINK-29708.
   
   The error field is more structured with exception type, stackTrace, 
additionalMetadata, etc.
   

   
   This ticket is a proposal to expose a config 
("kubernetes.operator.exception.metadata.mapper") to enrich the 
additionalMetadata further.
   

   
   The config consists of key-value pairs, for example:
   
   ```
   kubernetes.operator.exception.metadata.mapper: IOException:Found 
IOException,403:Found 403 error code
   ```
   
   The key is a REGEX string that will be used to match against the whole stack 
trace and if found, the value will be added to additionalMetadata. For example:
   
   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkSessionJob
   
 name: basic-session-job-example
 namespace: default
 resourceVersion: "70206149"
 uid: 916ea8f5-0821-4839-9953-2db9678c3fc9
   spec:
 deploymentName: basic-session-deployment-example
 job:
   args: []
   jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar
   parallelism: 4
   state: running
   upgradeMode: stateless
   status:
 error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException:
   Server returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
   403 error code","Found 
IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server
   returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
   403 error code"]}}]}'
   ...
   ```

   
   
   
   ## Brief change log
   
   - Added new operator config for user to specify REGEX to extract info from 
exception and enrich CR error field with it.
   
   ## Verifying this change
   
   - Added unit test
   - Sanity test by building image and testing in minikube
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(no)
 - Core observer or reconciler logic that is regularly executed: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #617: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric V2

2023-06-12 Thread via GitHub


morhidi commented on code in PR #617:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/617#discussion_r1226911859


##
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java:
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.OperatorTestBase;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for recommended parallelism. */
+@EnableKubernetesMockClient(crud = true)
+public class RecommendedParallelismTest extends OperatorTestBase {

Review Comment:
   Thanks @mxm for your comprehensive review and feedbacks. I agree we 
can/should be more careful with tracking internal decisions upstream. I'll 
follow up on that @gyfora thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] daniel-packard commented on pull request #158: [ADD-SYMLINKS] Add symlinks for FLINK_VERSION to FLINK_RELEASE jars

2023-06-12 Thread via GitHub


daniel-packard commented on PR #158:
URL: https://github.com/apache/flink-docker/pull/158#issuecomment-1587654571

   Hey @zentol - 
   
   Right now we have two proposed changes on the table:
   1. add symlinks from e.g.  `1.16.jar` -> `1.16.2.jar`
   2. add support for pattern matching e.g. `1.16.*.jar`
   
   I think we should try to decide between: 
   - implement 1, 
   - implement 2, 
   - implement _both_, 
   - implement _neither_
   
   Both of the proposed changes would let the consumer of `flink:1.16` docker 
images ignore the detail of the patch version (e.g. `1.16.2`). 
   
   So I think the first question we should answer is... Is it a reasonable 
request that "consumers of `flink:1.16` docker images" should be able to 
"ignore the detail of the patch version"?
   
   In my mind, the answer is "yes" - because if I wanted to worry about patch 
version, I would use a more specific docker base image (`flink:1.16.2`) 
   
   But it's possible that safety/stability _demands_ that devs are aware of 
changing patch versions  in which case the answer would be no.. But then I 
might ask why `flink:1.16` is even an option for a base image if we still have 
to worry about patch version changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-12 Thread via GitHub


hanyuzheng7 commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1587632051

   @dawidwys Yes concatenates is a mistake, I have already fixed it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-12 Thread via GitHub


dawidwys commented on code in PR #22730:
URL: https://github.com/apache/flink/pull/22730#discussion_r1226894432


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -272,6 +281,61 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_MAX =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_MAX")
+.kind(SCALAR)
+.inputTypeStrategy(
+new InputTypeStrategy() {

Review Comment:
   Yes, we should have that in a separate class.
   
   BTW, I don't think the current implementation checks comparability in the 
right way. You can take a look (and probably reuse a lot of) at 
https://github.com/confluentinc/flink/blob/ff7a0f6f25748933729bc94038444b3d4e233315/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java#L59
   
   I'd try to extract as much of the logic from there and reuse it.
   
   In the end I believe we should strive to have something similar to: 
https://github.com/confluentinc/flink/blob/ff7a0f6f25748933729bc94038444b3d4e233315/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java#L35
 where the constraint is applied to array's argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32317) Enrich metadata in CR error field

2023-06-12 Thread Daren Wong (Jira)
Daren Wong created FLINK-32317:
--

 Summary: Enrich metadata in CR error field
 Key: FLINK-32317
 URL: https://issues.apache.org/jira/browse/FLINK-32317
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Daren Wong
 Fix For: kubernetes-operator-1.6.0


CR Error field is improved in https://issues.apache.org/jira/browse/FLINK-29708.

The error field is more structured with exception type, stackTrace, 
additionalMetadata, etc.

 

This ticket is a proposal to expose a config 
("kubernetes.operator.exception.metadata.mapper") to enrich the 
additionalMetadata further.

 

The config consists of key-value pairs, for example:
{code:java}
kubernetes.operator.exception.metadata.mapper: IOException:Found 
IOException,403:Found 403 error code{code}
The key is a REGEX string that will be used to match against the whole stack 
trace and if found, the value will be added to additionalMetadata. For example:
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob

  name: basic-session-job-example
  namespace: default
  resourceVersion: "70206149"
  uid: 916ea8f5-0821-4839-9953-2db9678c3fc9
spec:
  deploymentName: basic-session-deployment-example
  job:
args: []
jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar
parallelism: 4
state: running
upgradeMode: stateless
status:
  error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException:
Server returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
403 error code","Found 
IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server
returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
403 error code"]}}]}'
...
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dawidwys commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function

2023-06-12 Thread via GitHub


dawidwys commented on PR #22730:
URL: https://github.com/apache/flink/pull/22730#issuecomment-1587592181

   Thank you @hanyuzheng7 for the contribution. Could you please fill in the 
description template correctly? It helps reviewers tremendously. You have a 
nice overview what your changes do, but would be really helpful if you put it 
in the correct sections and fill in the rest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32300) Support get object for result set

2023-06-12 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li closed FLINK-32300.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

Fixed via 
https://github.com/apache/flink/commit/c7afa323582888ec90941e091f72b6ef3a599b13 
(master)

[~zjureel] Thanks for your PR!

> Support get object for result set
> -
>
> Key: FLINK-32300
> URL: https://issues.apache.org/jira/browse/FLINK-32300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Support get object for result set



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] libenchao closed pull request #22757: [FLINK-32300][jdbc-driver] Support get object for result set

2023-06-12 Thread via GitHub


libenchao closed pull request #22757: [FLINK-32300][jdbc-driver] Support get 
object for result set
URL: https://github.com/apache/flink/pull/22757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623
 ] 

Matthias Pohl edited comment on FLINK-32311 at 6/12/23 2:10 PM:


This can, indeed, happen in the new implementation (even with the 
{{MultipleComponentLeaderElectionDriver}} implementation which is not used in 
the test run right now) because we call close on the driver within the lock. 
The old {{DefaultLeaderElectionService}} implementation didn't do that (see 
[DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113]
 in {{release-1.17}}).

The {{DefaultMultipleComponentLeaderElectionService}} implementation does close 
the driver in a lock, though. But it doesn't rely on the lock when processing 
the event (e.g. in 
[DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]).
 I considered this a bug in the implementation {{MultipleComponent*}} 
implementation initially: The event handling processing is done in a single 
thread to avoid locking. But the close method can be still called from another 
thread. 


was (Author: mapohl):
This can, indeed, happen in the new implementation (even with the 
{{MultipleComponentLeaderElectionDriver}} implementation which is not used in 
the test run right now) because we call close on the driver within the lock. 
The old {{DefaultLeaderElectionService}} implementation didn't do that (see 
[DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113]
 in {{release-1.17}}).

The {{DefaultMultipleComponentLeaderElectionService}} implementation does close 
the driver in a lock, though. But it doesn't rely on the lock when processing 
the event (e.g. in 
[DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]).
 I considered this a bug in the implementation {{MultipleComponent*}} 
implementation initially: The event handling processing is done in a single 
thread to avoid locking. But the close method can be called from another 
thread. 

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
> DefaultLeaderElectionService.onGrantLeadership fell into dead lock
> -
>
> Key: FLINK-32311
> URL: https://issues.apache.org/jira/browse/FLINK-32311
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]
>  
> there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
> {{0xe3a89c18}}
> {noformat}
> 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
> "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
> tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry 
> [0x7f94b63e1000]
> 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: 
> BLOCKED (on object monitor)
> 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
> 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54  - waiting to lock 
> <0xe3a89c18> (a java.lang.Object)
> 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
> 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
> 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown
>  Source)
> 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92)
> 

[jira] [Comment Edited] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623
 ] 

Matthias Pohl edited comment on FLINK-32311 at 6/12/23 2:08 PM:


This can, indeed, happen in the new implementation (even with the 
{{MultipleComponentLeaderElectionDriver}} implementation which is not used in 
the test run right now) because we call close on the driver within the lock. 
The old {{DefaultLeaderElectionService}} implementation didn't do that (see 
[DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113]
 in {{release-1.17}}).

The {{DefaultMultipleComponentLeaderElectionService}} implementation does close 
the driver in a lock, though. But it doesn't rely on the lock when processing 
the event (e.g. in 
[DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]).
 I considered this a bug in the implementation {{MultipleComponent*}} 
implementation initially: The event handling processing is done in a single 
thread to avoid locking. But the close method can be called from another 
thread. 


was (Author: mapohl):
This can, indeed, happen in the new implementation (even with the 
{{MultipleComponentLeaderElectionDriver}} implementation which is not used in 
the test run right now) because we call close on the driver within the lock. 
The old {{DefaultLeaderElectionService}} implementation didn't do that (see 
[DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113]
 in {{release-1.17}}).

The {{DefaultMultipleComponentLeaderElectionService}} implementation does close 
the driver in a lock, though. But it doesn't rely on the lock when processing 
the event (e.g. in 
[DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]).
 I considered this a bug in the implementation {{MultipleComponent*}} 
implementation initially: The event handling processing is done in a single 
thread to avoid locking. But the close method can be called from another 
thread. 

But that thought might have been wrong: It should be enough to run the event 
triggering (rather than the event handling) and the close method in the lock. 
The close method shuts down the event processing entirely (which includes 
interrupting any outstanding event processing tasks); no events can be 
triggered afterwards anymore. I'm gonna go ahead and come up with a proposal 
here.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
> DefaultLeaderElectionService.onGrantLeadership fell into dead lock
> -
>
> Key: FLINK-32311
> URL: https://issues.apache.org/jira/browse/FLINK-32311
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]
>  
> there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
> {{0xe3a89c18}}
> {noformat}
> 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
> "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
> tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry 
> [0x7f94b63e1000]
> 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: 
> BLOCKED (on object monitor)
> 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
> 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54  - waiting to lock 
> <0xe3a89c18> (a java.lang.Object)
> 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
> 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
> 

[jira] [Commented] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623
 ] 

Matthias Pohl commented on FLINK-32311:
---

This can, indeed, happen in the new implementation (even with the 
{{MultipleComponentLeaderElectionDriver}} implementation which is not used in 
the test run right now) because we call close on the driver within the lock. 
The old {{DefaultLeaderElectionService}} implementation didn't do that (see 
[DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113]
 in {{release-1.17}}).

The {{DefaultMultipleComponentLeaderElectionService}} implementation does close 
the driver in a lock, though. But it doesn't rely on the lock when processing 
the event (e.g. in 
[DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]).
 I considered this a bug in the implementation {{MultipleComponent*}} 
implementation initially: The event handling processing is done in a single 
thread to avoid locking. But the close method can be called from another 
thread. 

But that thought might have been wrong: It should be enough to run the event 
triggering (rather than the event handling) and the close method in the lock. 
The close method shuts down the event processing entirely (which includes 
interrupting any outstanding event processing tasks); no events can be 
triggered afterwards anymore. I'm gonna go ahead and come up with a proposal 
here.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
> DefaultLeaderElectionService.onGrantLeadership fell into dead lock
> -
>
> Key: FLINK-32311
> URL: https://issues.apache.org/jira/browse/FLINK-32311
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]
>  
> there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
> {{0xe3a89c18}}
> {noformat}
> 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
> "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
> tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry 
> [0x7f94b63e1000]
> 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: 
> BLOCKED (on object monitor)
> 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
> 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54  - waiting to lock 
> <0xe3a89c18> (a java.lang.Object)
> 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
> 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
> 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown
>  Source)
> 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92)
> 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown
>  Source)
> 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown
>  Source)
> 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89)
> 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89)
> 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54  at 
> 

[GitHub] [flink] hanyuzheng7 commented on pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function

2023-06-12 Thread via GitHub


hanyuzheng7 commented on PR #22717:
URL: https://github.com/apache/flink/pull/22717#issuecomment-1587333488

   @snuyanzin  for this case `SELECT array_concat(ARRAY[1], ARRAY[CAST(NULL AS 
INT)]); -- returns [1, 0]`
   other method has same problem such as array_union, I cannot solve it up to 
now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:

Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover, 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31665) Add ARRAY_CONCAT supported in SQL & Table API

2023-06-12 Thread Mara Steiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731607#comment-17731607
 ] 

Mara Steiu commented on FLINK-31665:


I personally think that ARRAY_CONCAT is most intuitive option and the fact that 
other vendors like BQ use it also helps.

> Add ARRAY_CONCAT supported in SQL & Table API
> -
>
> Key: FLINK-31665
> URL: https://issues.apache.org/jira/browse/FLINK-31665
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: jackylau
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_CONCEPT
> The array_concat() function concatenates at least one array, creating an 
> array that contains all the elements in the first array followed by all the 
> elements in the second array ... followed by all the elements in the n array.
> h3. Brief change log
> ARRAY_CONCAT for Table API and SQL
> Syntax:
> {code:java}
> ARRAY_CONCAT(arr1, tail...){code}
> Arguments:
> array: at least one ARRAY to be handled.
> Returns:
> The function returns NULL if any input argument is NULL.
> Examples:
> {code:java}
> Flink SQL> SELECT array_concat(array[1, 2, 3, null, 3], array[3], array[5]); 
> [1, 2, 3, null, 3, 3]{code}
> see also:
> Google Cloud BigQuery: 
> [https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions#array_concat]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731608#comment-17731608
 ] 

Cai Liuyang commented on FLINK-32316:
-

[~pnowojski]  please take a look? thks~

> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> 
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.16.0
>Reporter: Cai Liuyang
>Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover, 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is create it will create the 
> DefaultExecutionGraph.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling, so the first 
> SourceCoordinator will not be fully closed).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32222) Cassandra Source uses DataInputDeserializer and DataOutputSerializer non public apis

2023-06-12 Thread Etienne Chauchot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot resolved FLINK-3.
--
Fix Version/s: cassandra-3.2.0
   Resolution: Fixed

merged master: 4f1cdfa

> Cassandra Source uses DataInputDeserializer and DataOutputSerializer non 
> public apis
> 
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
> Fix For: cassandra-3.2.0
>
>
> in class _CassandraSplitSerializer,_ these non public APIs usage __ violate
> _ConnectorRules#CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-31806) Prod architecture tests didn't detect non-public API usage

2023-06-12 Thread Etienne Chauchot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot resolved FLINK-31806.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

merged: 28abe81

> Prod architecture tests didn't detect non-public API usage
> --
>
> Key: FLINK-31806
> URL: https://issues.apache.org/jira/browse/FLINK-31806
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra, Tests
>Affects Versions: cassandra-3.0.0, 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> FLINK-31805 wasn't detected by the production architecture tests.
> Not sure if this is an issue on the cassandra or Flink side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-32316:
---

 Summary: Duplicated announceCombinedWatermark task maybe scheduled 
if jobmanager failover
 Key: FLINK-32316
 URL: https://issues.apache.org/jira/browse/FLINK-32316
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Cai Liuyang


When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32315) Support local file upload in K8s mode

2023-06-12 Thread Paul Lin (Jira)
Paul Lin created FLINK-32315:


 Summary: Support local file upload in K8s mode
 Key: FLINK-32315
 URL: https://issues.apache.org/jira/browse/FLINK-32315
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Deployment / Kubernetes
Reporter: Paul Lin


Currently, Flink assumes all resources are locally accessible in the pods, 
which requires users to prepare the resources by mounting storages, downloading 
resources with init containers, or rebuilding images for each execution.

We could make things much easier by introducing a built-in file distribution 
mechanism based on Flink-supported filesystems. It's implemented in two steps:

 
1. KubernetesClusterDescripter uploads all local resources to remote storage 
via Flink filesystem (skips if the resources are already remote).
2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
download the resources and put them in the classpath during startup.
 
The 2nd step is mostly done by 
[FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
issue is focused on the upload part.
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22764: [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown

2023-06-12 Thread via GitHub


flinkbot commented on PR #22764:
URL: https://github.com/apache/flink/pull/22764#issuecomment-1587277700

   
   ## CI report:
   
   * 95de63ef5c02258d5f8fff9654daada0b49b2652 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol opened a new pull request, #22764: [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown

2023-06-12 Thread via GitHub


zentol opened a new pull request, #22764:
URL: https://github.com/apache/flink/pull/22764

   Stray threads in akka/netty can run into classloading errors because they 
aren't shut down (for some reason) despite the actor system having shut down.
   Ignore errors in this specific situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32314) Ignore class-loading errors after RPC system shutdown

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32314:
---
Labels: pull-request-available  (was: )

> Ignore class-loading errors after RPC system shutdown
> -
>
> Key: FLINK-32314
> URL: https://issues.apache.org/jira/browse/FLINK-32314
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / RPC, Tests
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> In tests we occasionally see the akka rpc service throwing class loading 
> errors _after_ it was shut down.
> AFAICT our shutdown procedure is correct, and it's just akka shutting down 
> some things asynchronously.
> I couldn't figure out why/what is still running, so as a bandaid I suggest to 
> ignore classloading errors after the rpc service shutdown has completed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32314) Ignore class-loading errors after RPC system shutdown

2023-06-12 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32314:


 Summary: Ignore class-loading errors after RPC system shutdown
 Key: FLINK-32314
 URL: https://issues.apache.org/jira/browse/FLINK-32314
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / RPC, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


In tests we occasionally see the akka rpc service throwing class loading errors 
_after_ it was shut down.
AFAICT our shutdown procedure is correct, and it's just akka shutting down some 
things asynchronously.
I couldn't figure out why/what is still running, so as a bandaid I suggest to 
ignore classloading errors after the rpc service shutdown has completed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] snuyanzin opened a new pull request, #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level

2023-06-12 Thread via GitHub


snuyanzin opened a new pull request, #56:
URL: https://github.com/apache/flink-connector-jdbc/pull/56

   After that any code with import of `flink-shaded` should fail at compilation 
on checkstyle task


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc

2023-06-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32313:
---
Labels: pull-request-available  (was: )

> CrateDB relies on flink-shaded in flink-connector-jdbc
> --
>
> Key: FLINK-32313
> URL: https://issues.apache.org/jira/browse/FLINK-32313
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Priority: Blocker
>  Labels: pull-request-available
>
> See 
> https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27
>  - JDBC shouldn't rely on flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32308) RestClusterClient submit job to remote cluster

2023-06-12 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32308.
--
Resolution: Invalid

[~SpongebobZ] This request for help should be asked via the User mailing list, 
Stackoverflow or Slack. Jira is for bugs or new feature requests

> RestClusterClient submit job to remote cluster
> --
>
> Key: FLINK-32308
> URL: https://issues.apache.org/jira/browse/FLINK-32308
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Not a Priority
>
> I just used `RestClusterClient` submit job to remote cluster, but out of my 
> expectation it submitted to local cluster instead. Could you help with me
> {code:java}
> String host = "x.x.x.x";
> int port = 8081;
> Configuration flinkConfiguration = new Configuration();
> flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
> flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123);
> flinkConfiguration.setInteger(RestOptions.PORT, port);
> RestClusterClient clusterClient = new 
> RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance());
> String s = clusterClient.getWebInterfaceURL();
> List extraJars = new ArrayList<>();
> extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL());
> PackagedProgram packagedProgram = PackagedProgram.newBuilder()
> .setConfiguration(flinkConfiguration)
> .setJarFile(new File("F:\\data.jar"))
> .setEntryPointClassName("MyApplication")
> .setUserClassPaths(extraJars)
> .build();
> JobID jobID = JobID.generate();
> JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, 
> flinkConfiguration, 2, jobID, false);
> clusterClient.submitJob(jobGraph);
> System.out.println(jobID); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-32311:
-

Assignee: Matthias Pohl

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
> DefaultLeaderElectionService.onGrantLeadership fell into dead lock
> -
>
> Key: FLINK-32311
> URL: https://issues.apache.org/jira/browse/FLINK-32311
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]
>  
> there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
> {{0xe3a89c18}}
> {noformat}
> 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
> "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
> tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry 
> [0x7f94b63e1000]
> 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: 
> BLOCKED (on object monitor)
> 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
> 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54  - waiting to lock 
> <0xe3a89c18> (a java.lang.Object)
> 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
> 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
> 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown
>  Source)
> 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92)
> 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown
>  Source)
> 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown
>  Source)
> 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89)
> 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89)
> 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:711)
> 2023-06-08T01:18:54.5621198Z Jun 08 01:18:54  - locked <0xe3a8a1e8> 
> (a 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch)
> 2023-06-08T01:18:54.5622072Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:597)
> 2023-06-08T01:18:54.5622991Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.access$600(LeaderLatch.java:64)
> 2023-06-08T01:18:54.5623988Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:648)
> 2023-06-08T01:18:54.5624965Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> 2023-06-08T01:18:54.5626218Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> 2023-06-08T01:18:54.5627369Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> 2023-06-08T01:18:54.5628353Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187)
> 

[jira] [Created] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc

2023-06-12 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32313:
--

 Summary: CrateDB relies on flink-shaded in flink-connector-jdbc
 Key: FLINK-32313
 URL: https://issues.apache.org/jira/browse/FLINK-32313
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: Martijn Visser


See 
https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27
 - JDBC shouldn't rely on flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc

2023-06-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731579#comment-17731579
 ] 

Martijn Visser commented on FLINK-32313:


[~matriv] This is a blocker for JDBC, can you make sure that no Flink-Shaded is 
used by CrateDB? 

> CrateDB relies on flink-shaded in flink-connector-jdbc
> --
>
> Key: FLINK-32313
> URL: https://issues.apache.org/jira/browse/FLINK-32313
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Priority: Blocker
>
> See 
> https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27
>  - JDBC shouldn't rely on flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32312) SSLConnectionSocketFactory produced no output for 900 seconds

2023-06-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731578#comment-17731578
 ] 

Sergey Nuyanzin commented on FLINK-32312:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49645=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15678

> SSLConnectionSocketFactory produced no output for 900 seconds
> -
>
> Key: FLINK-32312
> URL: https://issues.apache.org/jira/browse/FLINK-32312
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49688=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15175
> {noformat}
> "main" #1 prio=5 os_prio=0 tid=0x7fb46c00b800 nid=0x184 runnable 
> [0x7fb473251000]
> Jun 06 09:53:49java.lang.Thread.State: RUNNABLE
> Jun 06 09:53:49   at java.net.PlainSocketImpl.socketConnect(Native Method)
> Jun 06 09:53:49   at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> Jun 06 09:53:49   at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> Jun 06 09:53:49   at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> Jun 06 09:53:49   at 
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> Jun 06 09:53:49   at java.net.Socket.connect(Socket.java:607)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.httpclient.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.execute(AbstractHttpClientWagon.java:1005)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1162)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1140)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.StreamWagon.getInputStream(StreamWagon.java:126)
> Jun 06 09:53:49   at 
> org.apache.maven.wagon.StreamWagon.getIfNewer(StreamWagon.java:88)
> ...
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32312) SSLConnectionSocketFactory produced no output for 900 seconds

2023-06-12 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32312:
---

 Summary: SSLConnectionSocketFactory produced no output for 900 
seconds
 Key: FLINK-32312
 URL: https://issues.apache.org/jira/browse/FLINK-32312
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49688=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15175
{noformat}
"main" #1 prio=5 os_prio=0 tid=0x7fb46c00b800 nid=0x184 runnable 
[0x7fb473251000]
Jun 06 09:53:49java.lang.Thread.State: RUNNABLE
Jun 06 09:53:49 at java.net.PlainSocketImpl.socketConnect(Native Method)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
Jun 06 09:53:49 at 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
Jun 06 09:53:49 at java.net.Socket.connect(Socket.java:607)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.execute(AbstractHttpClientWagon.java:1005)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1162)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1140)
Jun 06 09:53:49 at 
org.apache.maven.wagon.StreamWagon.getInputStream(StreamWagon.java:126)
Jun 06 09:53:49 at 
org.apache.maven.wagon.StreamWagon.getIfNewer(StreamWagon.java:88)

...
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32299) Upload python jar when sql contains python udf jar

2023-06-12 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-32299:
-

Assignee: FangYong

> Upload python jar when sql contains python udf jar
> --
>
> Key: FLINK-32299
> URL: https://issues.apache.org/jira/browse/FLINK-32299
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway, Table SQL / Runtime
>Reporter: Shengkai Fang
>Assignee: FangYong
>Priority: Major
>
> Currently, sql gateway always uploads the python jar when submitting jobs. 
> However, it's not required for every sql job. We should add the python jar 
> into the PipelineOpitons.JARS only when user jobs contain python udf.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32309) Shared classpaths and jars manager for jobs in sql gateway cause confliction

2023-06-12 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-32309:
-

Assignee: FangYong

> Shared classpaths and jars manager for jobs in sql gateway cause confliction
> 
>
> Key: FLINK-32309
> URL: https://issues.apache.org/jira/browse/FLINK-32309
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: FangYong
>Priority: Major
>
> Current all jobs in the same session of sql gateway will share the resource 
> manager which provide the classpath for jobs. After a job is performed, it's 
> classpath and jars will be in the shared resource manager which are used by 
> the next jobs. It may cause too many unnecessary jars in a job or even cause 
> confliction 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27284) KafkaSinkITCase$IntegrationTests.testScaleUp failed on azures due to failed to create topic

2023-06-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731575#comment-17731575
 ] 

Sergey Nuyanzin commented on FLINK-27284:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49702=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=36363

> KafkaSinkITCase$IntegrationTests.testScaleUp failed on azures due to failed 
> to create topic
> ---
>
> Key: FLINK-27284
> URL: https://issues.apache.org/jira/browse/FLINK-27284
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.16.0, 1.17.2
>Reporter: Yun Gao
>Assignee: Qingsheng Ren
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-04-17T06:38:39.4884418Z Apr 17 06:38:39 [ERROR] Tests run: 10, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 97.71 s <<< FAILURE! - in 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase$IntegrationTests
> 2022-04-17T06:38:39.4885911Z Apr 17 06:38:39 [ERROR] 
> org.apache.flink.connector.kafka.sink.KafkaSinkITCase$IntegrationTests.testScaleUp(TestEnvironment,
>  DataStreamSinkExternalContext, CheckpointingMode)[2]  Time elapsed: 30.115 s 
>  <<< ERROR!
> 2022-04-17T06:38:39.4887050Z Apr 17 06:38:39 java.lang.RuntimeException: 
> Cannot create topic 'kafka-single-topic-4486440447887382037'
> 2022-04-17T06:38:39.4889332Z Apr 17 06:38:39  at 
> org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContext.createTopic(KafkaSinkExternalContext.java:108)
> 2022-04-17T06:38:39.4891038Z Apr 17 06:38:39  at 
> org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContext.createSink(KafkaSinkExternalContext.java:136)
> 2022-04-17T06:38:39.4892936Z Apr 17 06:38:39  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.tryCreateSink(SinkTestSuiteBase.java:567)
> 2022-04-17T06:38:39.4894388Z Apr 17 06:38:39  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.restartFromSavepoint(SinkTestSuiteBase.java:258)
> 2022-04-17T06:38:39.4895903Z Apr 17 06:38:39  at 
> org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.testScaleUp(SinkTestSuiteBase.java:201)
> 2022-04-17T06:38:39.4897144Z Apr 17 06:38:39  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-04-17T06:38:39.4898432Z Apr 17 06:38:39  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-04-17T06:38:39.4899803Z Apr 17 06:38:39  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-04-17T06:38:39.4900985Z Apr 17 06:38:39  at 
> java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 2022-04-17T06:38:39.4902266Z Apr 17 06:38:39  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> 2022-04-17T06:38:39.4903521Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> 2022-04-17T06:38:39.4904835Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> 2022-04-17T06:38:39.4906422Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> 2022-04-17T06:38:39.4907505Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> 2022-04-17T06:38:39.4908355Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> 2022-04-17T06:38:39.4909242Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> 2022-04-17T06:38:39.4910144Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> 2022-04-17T06:38:39.4911103Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> 2022-04-17T06:38:39.4912013Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> 2022-04-17T06:38:39.4913109Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> 2022-04-17T06:38:39.4913983Z Apr 17 06:38:39  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> 

[jira] [Commented] (FLINK-32036) TableEnvironmentTest.test_explain is unstable on azure ci

2023-06-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731573#comment-17731573
 ] 

Sergey Nuyanzin commented on FLINK-32036:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49702=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24547

> TableEnvironmentTest.test_explain is unstable on azure ci
> -
>
> Key: FLINK-32036
> URL: https://issues.apache.org/jira/browse/FLINK-32036
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.17.1
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> it's failed on ci (1.17 branch so far)
> {noformat}
> May 07 01:51:35 === FAILURES 
> ===
> May 07 01:51:35 __ TableEnvironmentTest.test_explain 
> ___
> May 07 01:51:35 
> May 07 01:51:35 self = 
>  testMethod=test_explain>
> May 07 01:51:35 
> May 07 01:51:35 def test_explain(self):
> May 07 01:51:35 schema = RowType() \
> May 07 01:51:35 .add('a', DataTypes.INT()) \
> May 07 01:51:35 .add('b', DataTypes.STRING()) \
> May 07 01:51:35 .add('c', DataTypes.STRING())
> May 07 01:51:35 t_env = self.t_env
> May 07 01:51:35 t = t_env.from_elements([], schema)
> May 07 01:51:35 result = t.select(t.a + 1, t.b, t.c)
> May 07 01:51:35 
> May 07 01:51:35 >   actual = result.explain()
> May 07 01:51:35 
> May 07 01:51:35 pyflink/table/tests/test_table_environment_api.py:66
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48766=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=25029



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-06-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32049:

Affects Version/s: 1.18.0

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP

2023-06-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731571#comment-17731571
 ] 

Sergey Nuyanzin commented on FLINK-32049:
-

Same for testUpscaling
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7575

> CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
> ---
>
> Key: FLINK-32049
> URL: https://issues.apache.org/jira/browse/FLINK-32049
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.1
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> CoordinatedSourceRescaleITCase.testDownscaling fails with
> {noformat}
> May 08 03:19:14 [ERROR] Failures: 
> May 08 03:19:14 [ERROR]   
> CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 
> May 08 03:19:14 Multiple Failures (1 failure)
> May 08 03:19:14 -- failure 1 --
> May 08 03:19:14 [Any cause contains message 'successfully restored 
> checkpoint'] 
> May 08 03:19:14 Expecting any element of:
> May 08 03:19:14   [org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is 
> suppressed by NoRestartBackoffTimeStrategy
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> May 08 03:19:14   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
> May 08 03:19:14   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
> May 08 03:19:14   ...(35 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed),
> May 08 03:19:14 java.lang.IllegalStateException: This executor has been 
> registered.
> May 08 03:19:14   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341)
> May 08 03:19:14   at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63)
> May 08 03:19:14   ...(17 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)]
> May 08 03:19:14 to satisfy the given assertions requirements but none did:
> May 08 03:19:14 
> May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> May 08 03:19:14   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> May 08 03:19:14   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> May 08 03:19:14   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> May 08 03:19:14   ...(45 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> May 08 03:19:14 error: 
> May 08 03:19:14 Expecting throwable message:
> May 08 03:19:14   "Job execution failed."
> May 08 03:19:14 to contain:
> May 08 03:19:14   "successfully restored checkpoint"
> May 08 03:19:14 but did not.
> May 08 03:19:14 
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-06-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-28440:

Affects Version/s: 1.18.0

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.18.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.18.0, 1.16.3
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at 

[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2023-06-12 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731569#comment-17731569
 ] 

Sergey Nuyanzin commented on FLINK-28440:
-

Redirected to here from FLINK-30107
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=8702

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.18.0, 1.16.3
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: 

[jira] [Updated] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32311:

Fix Version/s: (was: 1.18.0)

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
> DefaultLeaderElectionService.onGrantLeadership fell into dead lock
> -
>
> Key: FLINK-32311
> URL: https://issues.apache.org/jira/browse/FLINK-32311
> Project: Flink
>  Issue Type: Bug
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]
>  
> there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
> {{0xe3a89c18}}
> {noformat}
> 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
> "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
> tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry 
> [0x7f94b63e1000]
> 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: 
> BLOCKED (on object monitor)
> 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
> 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54  - waiting to lock 
> <0xe3a89c18> (a java.lang.Object)
> 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
> 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54  at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
> 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown
>  Source)
> 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92)
> 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown
>  Source)
> 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown
>  Source)
> 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89)
> 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89)
> 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:711)
> 2023-06-08T01:18:54.5621198Z Jun 08 01:18:54  - locked <0xe3a8a1e8> 
> (a 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch)
> 2023-06-08T01:18:54.5622072Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:597)
> 2023-06-08T01:18:54.5622991Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.access$600(LeaderLatch.java:64)
> 2023-06-08T01:18:54.5623988Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:648)
> 2023-06-08T01:18:54.5624965Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> 2023-06-08T01:18:54.5626218Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> 2023-06-08T01:18:54.5627369Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> 2023-06-08T01:18:54.5628353Z Jun 08 01:18:54  at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187)
> 2023-06-08T01:18:54.5629281Z Jun 08 01:18:54  at 
> 

  1   2   >