[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #589: [hotfix][docs] Fix flink version in doc and yaml

2023-05-09 Thread via GitHub


gyfora commented on PR #589:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/589#issuecomment-1541393752

   > Why not upgrade to the latest version: 1.17 directly?
   
   @X-czh @yangjf2019 we usually update the examples, clients etc once the 
first patch version is released (1.17.1) so I think 1.16 is fine for now 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] tamirsagi commented on pull request #589: [hotfix][docs] Fix flink version in doc and yaml

2023-05-09 Thread via GitHub


tamirsagi commented on PR #589:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/589#issuecomment-1541389257

   Hey,
   
   If you are updating  the documents, there is another place to update
   
   
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/plugins/#custom-flink-resource-listeners
   
   as for #2, it should be 
`org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721192#comment-17721192
 ] 

Tamir Sagi edited comment on FLINK-32041 at 5/10/23 5:44 AM:
-

Hey Gyula,

I also encountered something similar  (HA is enabled).

 
I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role({*}flink-operator{*}) which has been created by the operator using 
*{{rbac.nodesRule.create=true, they both look fine.}}*

 

The operator watches 2 namespaces:
 # its own:  dev-0-flink-clusters
 # dev-0-flink-temp-clusters

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=fe69ed8d14240d73b73f68176ee7fa4f13f2b0ee303676f8eea92b7bdee9ceb3!

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=c8a40ca61528174bd1667e3fcf10ba39e2224700198a69e828db80c66315719d!

{{Then the following error is thrown:}}

{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: [https://172.20.0.1/api/v1/nodes]. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:{*}dev-0-xsight-flink-operator-sa{*}"
 cannot list resource "nodes" in API group "" at the cluster scope."}}

{{could it be related to : kubernetes.rest-service.exposed.type? }}


was (Author: JIRAUSER283777):
Hey Gyula,

I also encountered something similar  (HA is enabled).

 
I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role({*}flink-operator{*}) which has been created by the operator using 
*{{rbac.nodesRule.create=true, they both look fine.}}*

 

The operator watches 2 namespaces:
 # its own:  dev-0-flink-clusters
 # dev-0-flink-temp-clusters

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=fe69ed8d14240d73b73f68176ee7fa4f13f2b0ee303676f8eea92b7bdee9ceb3!

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=c8a40ca61528174bd1667e3fcf10ba39e2224700198a69e828db80c66315719d!

{{Then I see}}

{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: [https://172.20.0.1/api/v1/nodes]. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:{*}dev-0-xsight-flink-operator-sa{*}"
 cannot list resource "nodes" in API group "" at the cluster scope."}}

{{could it be related to : kubernetes.rest-service.exposed.type? }}

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Assignee: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721192#comment-17721192
 ] 

Tamir Sagi edited comment on FLINK-32041 at 5/10/23 5:43 AM:
-

Hey Gyula,

I also encountered something similar  (HA is enabled).

 
I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role({*}flink-operator{*}) which has been created by the operator using 
*{{rbac.nodesRule.create=true, they both look fine.}}*

 

The operator watches 2 namespaces:
 # its own:  dev-0-flink-clusters
 # dev-0-flink-temp-clusters

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=fe69ed8d14240d73b73f68176ee7fa4f13f2b0ee303676f8eea92b7bdee9ceb3!

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=c8a40ca61528174bd1667e3fcf10ba39e2224700198a69e828db80c66315719d!

{{Then I see}}

{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: [https://172.20.0.1/api/v1/nodes]. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:{*}dev-0-xsight-flink-operator-sa{*}"
 cannot list resource "nodes" in API group "" at the cluster scope."}}

{{could it be related to : kubernetes.rest-service.exposed.type? }}


was (Author: JIRAUSER283777):
Hey Gyula,

 

I also encountered something similar 

{{HA is enabled, with RBAC}}
 
I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role({*}flink-operator{*}) which has been created by the operator using 
*{{rbac.nodesRule.create=true, they both look fine.}}*

 

The operator watches 2 namespaces:
 # its own:  dev-0-flink-clusters
 # dev-0-flink-temp-clusters

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=fe69ed8d14240d73b73f68176ee7fa4f13f2b0ee303676f8eea92b7bdee9ceb3!

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=c8a40ca61528174bd1667e3fcf10ba39e2224700198a69e828db80c66315719d!

{{Then I see}}

{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: https://172.20.0.1/api/v1/nodes. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:{*}dev-0-xsight-flink-operator-sa{*}"
 cannot list resource "nodes" in API group "" at the cluster scope."}}

{{could it be related to : kubernetes.rest-service.exposed.type? }}

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Assignee: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721192#comment-17721192
 ] 

Tamir Sagi commented on FLINK-32041:


Hey Gyula,

 

I also encountered something similar 

{{HA is enabled, with RBAC}}
 
I checked the rolebinding between the service account 
`dev-0-flink-clusters:dev-0-xsight-flink-operator-sa` and the corresponded 
role({*}flink-operator{*}) which has been created by the operator using 
*{{rbac.nodesRule.create=true, they both look fine.}}*

 

The operator watches 2 namespaces:
 # its own:  dev-0-flink-clusters
 # dev-0-flink-temp-clusters

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=fe69ed8d14240d73b73f68176ee7fa4f13f2b0ee303676f8eea92b7bdee9ceb3!

!https://lists.apache.org/api/email.lua?attachment=true=61qtwrnxlh722pvok8dtnzdt7t7k7drb=c8a40ca61528174bd1667e3fcf10ba39e2224700198a69e828db80c66315719d!

{{Then I see}}

{{org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: GET at: https://172.20.0.1/api/v1/nodes. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. nodes is forbidden: User 
"system:serviceaccount:dev-0-flink-clusters:{*}dev-0-xsight-flink-operator-sa{*}"
 cannot list resource "nodes" in API group "" at the cluster scope."}}

{{could it be related to : kubernetes.rest-service.exposed.type? }}

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Assignee: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] libenchao commented on a diff in pull request #22533: [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver

2023-05-09 Thread via GitHub


libenchao commented on code in PR #22533:
URL: https://github.com/apache/flink/pull/22533#discussion_r1189370293


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java:
##
@@ -46,7 +46,7 @@ static Executor create(
  *
  * @return the session configuration.
  */
-ReadableConfig getSessionConfig();
+Map getSessionConfig();

Review Comment:
   Is this a public api? Although this is not clearly annotated as 'api', we 
may need to be careful about this change. CC @fsk119 what do you think about 
this since you are the original author of this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31546) Close all statements when connection is closed

2023-05-09 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li closed FLINK-31546.
--
Fix Version/s: 1.18.0
 Assignee: Fang Yong
   Resolution: Fixed

Fixed via 
https://github.com/apache/flink/commit/400530d30af3d3d56805e9ab49397ca7778ba510

 [~zjureel] Thanks for your contribution!

> Close all statements when connection is closed
> --
>
> Key: FLINK-31546
> URL: https://issues.apache.org/jira/browse/FLINK-31546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / JDBC
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Close all statements when connection is closed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] libenchao merged pull request #22536: [FLINK-31546][jdbc-driver] Close all statements when connection is closed

2023-05-09 Thread via GitHub


libenchao merged PR #22536:
URL: https://github.com/apache/flink/pull/22536


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #22532: [FLINK-31548][jdbc-driver] Introduce FlinkDataSource

2023-05-09 Thread via GitHub


libenchao commented on code in PR #22532:
URL: https://github.com/apache/flink/pull/22532#discussion_r1189364154


##
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import javax.sql.DataSource;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+/** Basic flink data source which create {@link FlinkConnection}. */
+public class FlinkDataSource implements DataSource {
+private final String url;
+private final Properties properties;
+
+public FlinkDataSource(String url, Properties properties) {
+this.url = url;
+this.properties = properties;
+}
+
+@Override
+public Connection getConnection() throws SQLException {
+return new FlinkConnection(DriverUri.create(url, properties));
+}
+
+@Override
+public Connection getConnection(String username, String password) throws 
SQLException {
+return getConnection();

Review Comment:
   Is it expected to discard `user` and `password` parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #7: [FLINK-31408] Add support for EOS delivery-guarantee in upsert-kafka

2023-05-09 Thread via GitHub


tzulitai commented on code in PR #7:
URL: 
https://github.com/apache/flink-connector-kafka/pull/7#discussion_r1189345491


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * The purpose of this interface is to be able to pass an interface rather 
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit 
testing.

Review Comment:
   I guess it's probably not only for unit testing? Without the mixin interface 
the `ReducingUpsertSink` and `ReducingUpsertWriter` wouldn't be able to 
properly wrap the KafkaSink / KafkaWriter delegates.



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java:
##
@@ -32,19 +33,21 @@
  * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
  * and only emit it when the buffer is full or a timer is triggered or a 
checkpoint happens.
  *
- * The sink provides eventual consistency guarantees without the need of a 
two-phase protocol
- * because the updates are idempotent therefore duplicates have no effect.
+ * The sink provides eventual consistency guarantees without under {@link
+ * org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because 
the updates are
+ * idempotent therefore duplicates have no effect.

Review Comment:
   ```suggestion
* The sink provides eventual consistency guarantees under {@link
* org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because 
the updates are
* idempotent therefore duplicates have no effect.
   ```



##
docs/content/docs/connectors/table/upsert-kafka.md:
##
@@ -277,6 +293,19 @@ connector is working in the upsert mode, the last record 
on the same key will ta
 reading back as a source. Therefore, the upsert-kafka connector achieves 
idempotent writes just like
 the [HBase sink]({{< ref "docs/connectors/table/hbase" >}}).
 
+With Flink's checkpointing enabled, the `upsert-kafka` connector can provide 
exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.delivery-guarantee` 
option:
+
+* `none`: Flink will not guarantee anything. Produced records can be lost or 
they can be duplicated.
+* `at-least-once` (default setting): This guarantees that no records will be 
lost (although they can be duplicated).
+* `exactly-once`: Kafka transactions will be used to provide exactly-once 
semantic. Whenever you write
+  to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_uncommitted`
+  or `read_committed` - the latter one is the default value) for any 
application consuming records
+  from Kafka.
+
+Please refer to [Kafka documentation]({{< ref 
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for 
more caveats about delivery guarantees.

Review Comment:
   ```suggestion
   Please refer to [Kafka connector documentation]({{< ref 
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for 
more caveats about delivery guarantees.
   ```



##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * The purpose of this interface is to be able to pass an interface rather 
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit 
testing.
+ *
+ * @param  The type of the sink's input
+ * @param  The type of the sink writer's state
+ * @param  The type of the committables.
+ */
+@Internal
+public interface TwoPhaseCommittingStatefulSink

Review Comment:
   I'm a bit torn whether or not we should just accept this awkwardness and 
move on. I understand the need for this "tag" mixin interface, but it just 
feels really odd.
   
   Interface mixins in Java really doesn't play well with the decorator pattern 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [flink] 1996fanrui commented on pull request #22496: [FLINK-31959][checkpoint] Correct the unaligned checkpoint type at checkpoint level

2023-05-09 Thread via GitHub


1996fanrui commented on PR #22496:
URL: https://github.com/apache/flink/pull/22496#issuecomment-1541331915

   Thanks a lot for your review.
   
   > Thanks for the fix! LGTM. Have you verified manually that it's working in 
the web ui? Could you post a screenshot that timeout didn't happen UI is 
showing aligned checkpoint?
   
   Sure, here is a recording about the checkpoint type. I didn't post a 
screenshot, because the checkpoint type and the configuration of unaligned 
checkpoint are on different page. 
   
   
https://drive.google.com/file/d/1fOYN79mmr1DMhRIpWIX6RRZTJjPHjT8m/view?usp=share_link


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31890) Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-09 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31890:
---
Summary: Introduce SchedulerBase per-task failure enrichment/labeling  
(was: Introduce JobMaster per-task failure enrichment/labeling)

> Introduce SchedulerBase per-task failure enrichment/labeling
> 
>
> Key: FLINK-31890
> URL: https://issues.apache.org/jira/browse/FLINK-31890
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-09 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721179#comment-17721179
 ] 

Hangxiang Yu commented on FLINK-31963:
--

Hi, [~pnowojski]. I am a bit sure that it may not be related to unified file 
mergeing of unaligned checkpoints because I meet above exception in 1.15.
My job is a bit complicated so I tried to simplify it to reproduce it. But I 
haven't currently. 
I will share more if I can reproduce it by a simple job or an ITCase.

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32042) Support calculate versions of all tables for job in planner

2023-05-09 Thread Fang Yong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong updated FLINK-32042:
--
Description: Set customized callback in planner for scan sources, for 
example, calculate snapshot id for different sources in the same job for data 
lake  (was: Set customized exec node graph processor in planner, for example, 
calculate snapshot id for different sources in the same job for data lake)

> Support calculate versions of all tables for job in planner
> ---
>
> Key: FLINK-32042
> URL: https://issues.apache.org/jira/browse/FLINK-32042
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Set customized callback in planner for scan sources, for example, calculate 
> snapshot id for different sources in the same job for data lake



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32042) Support calculate versions of all tables for job in planner

2023-05-09 Thread Fang Yong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong updated FLINK-32042:
--
Summary: Support calculate versions of all tables for job in planner  (was: 
Support customized exec node graph processor for planner)

> Support calculate versions of all tables for job in planner
> ---
>
> Key: FLINK-32042
> URL: https://issues.apache.org/jira/browse/FLINK-32042
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> Set customized exec node graph processor in planner, for example, calculate 
> snapshot id for different sources in the same job for data lake



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests are using JUnit5

2023-05-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30815.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

Fixed in master:

ed9ee279e50781b7bd2d85f1486721c02fc7e32b

7a423666d0f8452382ad5fe2635de5ad1475dd46

> BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests 
> are using JUnit5
> 
>
> Key: FLINK-30815
> URL: https://issues.apache.org/jira/browse/FLINK-30815
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
> (e.g. DynamicFilteringITCase) are using JUnit5. This may break some 
> assumption and hide some problems.
> For example, the child test will create a MiniCluster by itself, instead of 
> using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
> created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests are using JUnit5

2023-05-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30815:
--

Assignee: Yuxin Tan

> BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests 
> are using JUnit5
> 
>
> Key: FLINK-30815
> URL: https://issues.apache.org/jira/browse/FLINK-30815
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
> (e.g. DynamicFilteringITCase) are using JUnit5. This may break some 
> assumption and hide some problems.
> For example, the child test will create a MiniCluster by itself, instead of 
> using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
> created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] godfreyhe closed pull request #22427: [FLINK-30815][tests] Migrate BatchAbstractTestBase and BatchTestBase to junit5

2023-05-09 Thread via GitHub


godfreyhe closed pull request #22427:  [FLINK-30815][tests] Migrate 
BatchAbstractTestBase and BatchTestBase to junit5
URL: https://github.com/apache/flink/pull/22427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32042) Support customized exec node graph processor for planner

2023-05-09 Thread Fang Yong (Jira)
Fang Yong created FLINK-32042:
-

 Summary: Support customized exec node graph processor for planner
 Key: FLINK-32042
 URL: https://issues.apache.org/jira/browse/FLINK-32042
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Fang Yong


Set customized exec node graph processor in planner, for example, calculate 
snapshot id for different sources in the same job for data lake



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24595) Programmatic configuration of S3 doesn't pass parameters to Hadoop FS

2023-05-09 Thread Lu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721160#comment-17721160
 ] 

Lu commented on FLINK-24595:


i have the save problem in 1.15.2, it seems  

flink-s3-fs-hadoop read s3 configuration from env , not from the configuration 
obj

> Programmatic configuration of S3 doesn't pass parameters to Hadoop FS
> -
>
> Key: FLINK-24595
> URL: https://issues.apache.org/jira/browse/FLINK-24595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.14.0
> Environment: Flink 1.14.0
> JDK 8 
> {{openjdk version "1.8.0_302"}}
> {{OpenJDK Runtime Environment (Zulu 8.56.0.23-CA-macos-aarch64) (build 
> 1.8.0_302-b08)}}
> {{OpenJDK 64-Bit Server VM (Zulu 8.56.0.23-CA-macos-aarch64) (build 
> 25.302-b08, mixed mode)}}
>Reporter: Pavel Penkov
>Priority: Major
> Attachments: FlinkApp.java, TickingSource.java, flink_exception.txt
>
>
> When running in mini-cluster mode Flink apparently doesn't pass S3 
> configuration to underlying Hadoop FS. With a code like this
> {code:java}
> Configuration conf = new Configuration();
> conf.setString("s3.endpoint", "http://localhost:4566;);
> conf.setString("s3.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
> conf.setString("s3.access.key", "harvester");
> conf.setString("s3.secret.key", "harvester");
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(conf);
> {code}
> Application fails with an exception with most relevant error being {{Caused 
> by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials 
> provided by SimpleAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> com.amazonaws.SdkClientException: Failed to connect to service endpoint: }}
> So Hadoop lists all the providers but it should use only the one set in 
> configuration. Full project that reproduces this behaviour is available at 
> [https://github.com/PavelPenkov/flink-s3-conf] and relevant files are 
> attached to this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32027) Batch jobs could hang at shuffle phase when max parallelism is really large

2023-05-09 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-32027.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

master(1.18) via 63443aec09ece8596321328273c1e431e5029c4d.
release-1.17 via 8e5fb18ae5a80c4d0620979a944b017b203cdeac.
release-1.16 via c5a883d3976fc8367eba446790088ff46e59ab79.

> Batch jobs could hang at shuffle phase when max parallelism is really large
> ---
>
> Key: FLINK-32027
> URL: https://issues.apache.org/jira/browse/FLINK-32027
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0, 1.16.1
>Reporter: Yun Tang
>Assignee: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
> Attachments: image-2023-05-08-11-12-58-361.png
>
>
> In batch stream mode with adaptive batch schedule mode, If we set the max 
> parallelism large as 32768 (pipeline.max-parallelism), the job could hang at 
> the shuffle phase:
> It would hang for a long time and show "No bytes sent":
>  !image-2023-05-08-11-12-58-361.png! 
> After some time to debug, we can see the downstream operator did not receive 
> the end-of-partition event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22553: [BP-1.16][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


reswqa merged PR #22553:
URL: https://github.com/apache/flink/pull/22553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa merged pull request #22554: [BP-1.17][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


reswqa merged PR #22554:
URL: https://github.com/apache/flink/pull/22554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qinf commented on a diff in pull request #21963: [FLINK-31114][runtime] Set parallelism of job vertices in forward group at compilation phase

2023-05-09 Thread via GitHub


qinf commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1189291976


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -205,7 +216,7 @@ private StreamingJobGraphGenerator(
 this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
 
-this.jobVertices = new HashMap<>();
+this.jobVertices = new LinkedHashMap<>();

Review Comment:
   @wanglijie95 Yes, it seems it has no negative impacts in the community 
version. But in our internal version, we have used the HashMap jobVertices and 
the iterator order differs from the LinkedHasMap. We will try to fix it in our 
version. Thank you, Lijie.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] yangjf2019 commented on pull request #589: [hotfix][docs] Fix flink version in doc and yaml

2023-05-09 Thread via GitHub


yangjf2019 commented on PR #589:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/589#issuecomment-1541179768

   > Why not upgrade to the latest version: 1.17 directly?
   
   Thanks for your review, I have wanted to update to the latest stable version 
1.17.0, but considering the need to match the context of the documentation and 
test cases, I have not updated to the latest stable version for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] X-czh commented on pull request #589: [hotfix][docs] Fix flink version in doc and yaml

2023-05-09 Thread via GitHub


X-czh commented on PR #589:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/589#issuecomment-1541168048

   Why not upgrade to the latest version: 1.17 directly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] becketqin commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-05-09 Thread via GitHub


becketqin commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1189255125


##
docs/layouts/shortcodes/generated/yarn_config_configuration.html:
##
@@ -182,5 +182,15 @@
 String
 Specify YARN node label for the Flink TaskManagers, it will 
override the yarn.application.node-label for TaskManagers if both are set.
 
+
+  yarn.view.acls
+  (none)
+  Users and groups to give VIEW access. The ACLs are of for 
comma-separated-usersspacecomma-separated-groups
+
+
+  yarn.modify.acls
+  (none)
+  Users and groups to give MODIFY access. The ACLs are of for 
comma-separated-usersspacecomma-separated-groups

Review Comment:
   The format "comma-separated-usersspacecomma-separated-groups" looks a little 
weird.  Can we use "comma-separated-usersspacecomma-separated-groups"?



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -231,6 +231,25 @@ public class YarnConfigOptions {
 .withDescription(
 "A comma-separated list of tags to apply to the 
Flink YARN application.");
 
+/**
+ * Users and groups to give VIEW access.
+ * 
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html
+ */
+public static final ConfigOption APPLICATION_VIEW_ACLS =
+key("yarn.view.acls")
+.defaultValue("")

Review Comment:
   We need to define the type first.
   ```
   key("yarn.view.acls")
   .stringType()
   .defaultValue("")
   ```



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -231,6 +231,25 @@ public class YarnConfigOptions {
 .withDescription(
 "A comma-separated list of tags to apply to the 
Flink YARN application.");
 
+/**
+ * Users and groups to give VIEW access.
+ * 
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html
+ */
+public static final ConfigOption APPLICATION_VIEW_ACLS =
+key("yarn.view.acls")
+.defaultValue("")
+.withDescription(
+"Users and groups to give VIEW access. The ACLs 
are of for"
++ " 
comma-separated-usersspacecomma-separated-groups");
+
+/** Users and groups to give MODIFY access. */
+public static final ConfigOption APPLICATION_MODIFY_ACLS =
+key("yarn.modify.acls")
+.defaultValue("")

Review Comment:
   dito above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25568) Add Elasticsearch 7 Source Connector

2023-05-09 Thread xingyuan cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721136#comment-17721136
 ] 

xingyuan cheng commented on FLINK-25568:


Hi, community. It seems that this issue has not been progressed for a long 
time. My company has requirements related to the source connector supporting 
Elasticsearch, can I continue to promote this work?

> Add Elasticsearch 7 Source Connector
> 
>
> Key: FLINK-25568
> URL: https://issues.apache.org/jira/browse/FLINK-25568
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch
>Reporter: Alexander Preuss
>Assignee: Alexander Preuss
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> We want to support not only Sink but also Source for Elasticsearch. As a 
> first step we want to add a ScanTableSource for Elasticsearch 7.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16713) Support source mode of elasticsearch connector

2023-05-09 Thread xingyuan cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721135#comment-17721135
 ] 

xingyuan cheng commented on FLINK-16713:


Hi, community. It seems that this issue has not been progressed for a long 
time. My company has requirements related to the source connector supporting 
Elasticsearch, can I continue to promote this work?

> Support source mode of elasticsearch connector
> --
>
> Key: FLINK-16713
> URL: https://issues.apache.org/jira/browse/FLINK-16713
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.10.0
>Reporter: jackray wang
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector]
> For append-only queries, the connector can also operate in [append 
> mode|https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes]
>  for exchanging only INSERT messages with the external system. If no key is 
> defined by the query, a key is automatically generated by Elasticsearch.
> I want to know ,why the connector of flink with ES just support sink but  
> doesn't support source .Which version could add this feature to ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] clownxc commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-09 Thread via GitHub


clownxc commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1541110008

   can you spare some precious time to review? Thanks very much @Myasuka 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-09 Thread via GitHub


flinkbot commented on PR #22558:
URL: https://github.com/apache/flink/pull/22558#issuecomment-1540875174

   
   ## CI report:
   
   * 69b4b22e38f67abe10d98d7069435855fa3b0dc3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28744) Upgrade Calcite version to 1.31

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28744:
---
Labels: pull-request-available  (was: )

> Upgrade Calcite version to 1.31
> ---
>
> Key: FLINK-28744
> URL: https://issues.apache.org/jira/browse/FLINK-28744
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> We should upgrade to Calcite 1.31 so we can benefit from 
> https://issues.apache.org/jira/browse/CALCITE-4865



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin opened a new pull request, #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-05-09 Thread via GitHub


snuyanzin opened a new pull request, #22558:
URL: https://github.com/apache/flink/pull/22558

   ## What is the purpose of the change
   
   Upgrade Calcite to 1.31.0
   
   TBD
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:9no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] venkata91 commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-05-09 Thread via GitHub


venkata91 commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1189085613


##
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java:
##
@@ -61,6 +61,9 @@
 class YARNSessionFIFOITCase extends YarnTestBase {
 private static final Logger log = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
 
+protected static final String VIEW_ACLS = "user group";
+protected static final String MODIFY_ACLS = "admin groupAdmin";

Review Comment:
   Can we add a test for `wildcard` as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-playgrounds] alainbrown commented on pull request #42: Create flink_data volume for operations playground.

2023-05-09 Thread via GitHub


alainbrown commented on PR #42:
URL: https://github.com/apache/flink-playgrounds/pull/42#issuecomment-1540825166

   Actually, I think it's quite the opposite. You can more easily use 
checkpoints and savepoints via volumes. Data is persisted, easily backed up and 
shared between containers. You can push around files safely via CLI or 
prepopulate volumes by a container. 
   
   More over, I don't think the current master would even work on a Windows 
host OS at all, which is an anitpattern for Docker generally.
   
   Quick summary of the benefits of volumes over bind mounts (from docker 
[site](https://docs.docker.com/storage/volumes/)):
   
   Volumes are easier to back up or migrate than bind mounts.
   You can manage volumes using Docker CLI commands or the Docker API.
   Volumes work on both Linux and Windows containers.
   Volumes can be more safely shared among multiple containers.
   Volume drivers let you store volumes on remote hosts or cloud providers, to 
encrypt the contents of volumes, or to add other functionality.
   New volumes can have their content pre-populated by a container.
   Volumes on Docker Desktop have much higher performance than bind mounts from 
Mac and Windows hosts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on pull request #585: [FLINK-32005] Add a per-deployment error metric to signal about potential issues

2023-05-09 Thread via GitHub


mxm commented on PR #585:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/585#issuecomment-1540761289

   The latest commit contains a refactor and adds the successful scaling metric.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] architgyl commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers

2023-05-09 Thread via GitHub


architgyl commented on code in PR #22509:
URL: https://github.com/apache/flink/pull/22509#discussion_r1189036465


##
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##
@@ -620,4 +623,30 @@ public static YarnConfiguration getYarnConfiguration(
 
 return yarnConfig;
 }
+
+/**
+ * Sets the application ACLs for the given ContainerLaunchContext based on 
the values specified
+ * in the given Flink configuration. Only ApplicationAccessType.VIEW_APP 
and
+ * ApplicationAccessType.MODIFY_APP ACLs are set, and only if they are 
configured in the Flink
+ * configuration.
+ *
+ * @param amContainer the ContainerLaunchContext to set the ACLs for
+ * @param flinkConfig the Flink configuration to read the ACL values from
+ */
+public static void setAclsFor(
+ContainerLaunchContext amContainer,
+org.apache.flink.configuration.Configuration flinkConfig) {
+Map acls = new HashMap<>();
+String viewAcls = 
flinkConfig.getString(YarnConfigOptions.APPLICATION_VIEW_ACLS, null);
+String modifyAcls = 
flinkConfig.getString(YarnConfigOptions.APPLICATION_MODIFY_ACLS, null);
+if (viewAcls != null) {
+acls.put(ApplicationAccessType.VIEW_APP, viewAcls);
+}
+if (modifyAcls != null) {
+acls.put(ApplicationAccessType.MODIFY_APP, modifyAcls);
+}
+if (!acls.isEmpty()) {
+amContainer.setApplicationACLs(acls);

Review Comment:
   Made the change and added special handling for wildcards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm merged pull request #586: [FLINK-32002] Adjust autoscaler defaults for release

2023-05-09 Thread via GitHub


mxm merged PR #586:
URL: https://github.com/apache/flink-kubernetes-operator/pull/586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #585: [FLINK-32005] Add a per-deployment error metric to signal about potential issues

2023-05-09 Thread via GitHub


mxm commented on code in PR #585:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/585#discussion_r1188996818


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -82,8 +85,11 @@ public boolean scale(FlinkResourceContext>
 
 var conf = ctx.getObserveConfig();
 var resource = ctx.getResource();
+var resouceId = ResourceID.fromResource(resource);
+var autoscalerMetricGroup = 
ctx.getResourceMetricGroup().addGroup("AutoScaler");

Review Comment:
   Actually this one should probably just be added when the autoscaler is 
enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on pull request #585: [FLINK-32005] Add a per-deployment error metric to signal about potential issues

2023-05-09 Thread via GitHub


mxm commented on PR #585:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/585#issuecomment-1540684297

   I'm going to follow up with the scaling counter which also includes a 
refactor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #586: [FLINK-32002] Adjust autoscaler defaults for release

2023-05-09 Thread via GitHub


mxm commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1188960604


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Rephrased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on pull request #581: [FLINK-31936] Support setting scale up max factor

2023-05-09 Thread via GitHub


mxm commented on PR #581:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/581#issuecomment-1540617992

   No more comments. Thanks @X-czh!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22557: [FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-09 Thread via GitHub


flinkbot commented on PR #22557:
URL: https://github.com/apache/flink/pull/22557#issuecomment-1540614988

   
   ## CI report:
   
   * 634b92300c3a2beb191582780ea0720fb792d2cf UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22557: [FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-09 Thread via GitHub


reswqa commented on PR #22557:
URL: https://github.com/apache/flink/pull/22557#issuecomment-1540610679

   After this PR is approved, I will go to the external connector repository to 
adapt this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32024) Short code related to externalized connector retrieve version from its own data yaml

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32024:
---
Labels: pull-request-available  (was: )

> Short code related to externalized connector retrieve version from its own 
> data yaml
> 
>
> Key: FLINK-32024
> URL: https://issues.apache.org/jira/browse/FLINK-32024
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.18.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we have some shortcodes specifically designed for externalized 
> connector, such as {{connectors_artifact}}, {{sql_connector_download_table}}, 
> etc.
> When using them, we need to pass in a version number, such as 
> {{sql_connector_download_table "pulsar" 3.0.0}}.  It's easy for us to forget 
> to modify the corresponding version in the document when releasing a new 
> version.
> Of course, we can hard code these into the release process. But perhaps we 
> can introduce a version field to {{docs/data/connector_name.yml}} and let 
> flink directly reads the corresponding version when rendering shortcode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa opened a new pull request, #22557: [FLINK-32024][docs] Short code related to externalized connector retrieve version from its own data yaml

2023-05-09 Thread via GitHub


reswqa opened a new pull request, #22557:
URL: https://github.com/apache/flink/pull/22557

   ## What is the purpose of the change
   
   *Currently, we have some shortcodes specifically designed for externalized 
connector, such as `connectors_artifact`, `sql_connector_download_table`, etc.*
   
   *When using them, we need to pass in a version number, such as 
`sql_connector_download_table "pulsar" 3.0.0`. It's easy for us to forget to 
modify the corresponding version in the document when releasing a new version.*
   
   *Of course, we can hard code these into the release process. But perhaps we 
can introduce a version field to `docs/data/connector_name.yml` and let flink 
directly reads the corresponding version when rendering shortcode.*
   
   
   ## Brief change log
   
 - *Let short codes related to externalized connector retrieve version from 
its own data yaml.*
   
   
   ## Verifying this change
   
   Test manually in my local env.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540595241

   > One more thing, have you maybe manually confirmed that the bug fix is 
working after all of the changes in this PR.
   
   Yes, I manually test the topology described in 
[FLINK-31852](https://issues.apache.org/jira/browse/FLINK-31852) and 
[FLINK-18808](https://issues.apache.org/jira/browse/FLINK-18808) and It seems 
to be in line with expectations.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188900545


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)

Review Comment:
   When writing this test case, I felt a bit like just considering one of the 
two tags seemed sufficient. In this test case, no matter how the data is sent, 
only one `OutputTag` (`Odd` or `Even`) will actually be hit, which should be a 
dual relationship and will not affect the coverage of this test. 樂 
   
   What I actually want to consider here is the scenario where both 
`RecordWriter Without Tag` and `RecordWriter With Tag` exist, which is also the 
actual topology of this case. For more details, please refer to the picture 
attached in the other comments.
   
   Of course, if you think it is necessary to cover all tags here, I think it 
also makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540595646

   Thanks @pnowojski for the review!
   
   I have resolved some of the comments, and replied to the rest in comments. I 
will update promptly after receiving feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188921664


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   > ditto for the multi and two input case.](nit: Also it would help if you 
could extract the components of the total expected output records count to a 
separate local/named variables)
   
   This really a good suggestion.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   
![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this 
code:`.addNonChainedOutputsCount(new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + 
(numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two 
`RecordWriterOutput With Odd Tag`. It is only calculated once here, which is 
guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s 
`RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal 
output. So we have the first two parts above. 
   ```
public void processElement(StreamRecord element) {
   if (element.getValue() % 2 == 0) {
   output.collect(evenOutputTag, element);
   } else {
   output.collect(oddOutputTag, element);
   }
   output.collect(element);
}
   ```
   - `ChainedOutput` will ignore the data with output tag as we don't set 
`outputTag` for chaining `StreamEdge`. So subsequent operators will not receive 
duplicate data.



-- 
This is an automated message from the 

[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   
![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this 
code:`.addNonChainedOutputsCount(new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + 
(numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two 
`RecordWriterOutput With Odd Tag`. It is only calculated once here, which is 
guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s 
`RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal 
output. So we have the first two parts above. 
   - `ChainedOutput` will ignore the data with output tag as we don't set 
`outputTag` for chaining `StreamEdge`. So subsequent operators will not receive 
duplicate data.
   ```
public void processElement(StreamRecord element) {
   if (element.getValue() % 2 == 0) {
   output.collect(evenOutputTag, element);
   } else {
   output.collect(oddOutputTag, element);
   }
   output.collect(element);
}
   ```



-- 
This is an automated message from the 

[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   
![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this 
code:`.addNonChainedOutputsCount(new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + 
(numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two 
`RecordWriterOutput With Odd Tag`. It is only calculated once here, which is 
guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s 
`RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal 
output. So we have the first two parts above. 
   - `ChainedOutput` will ignore the data with output tag, subsequent operators 
will not receive duplicate data.
   ```
public void processElement(StreamRecord element) {
   if (element.getValue() % 2 == 0) {
   output.collect(evenOutputTag, element);
   } else {
   output.collect(oddOutputTag, element);
   }
   output.collect(element);
}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188900545


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)

Review Comment:
   When writing this test case, I felt a bit like just considering one of the 
two tags seemed sufficient. In this test case, no matter how the data is sent, 
only one `OutputTag` (`Odd` or `Even`) will actually be hit, which should be a 
dual relationship and will not affect the coverage of this test.
   
   What I actually want to consider here is the scenario where both 
`RecordWriter Without Tag` and `RecordWriter With Tag` exist, which is also the 
actual topology of this case. For more details, please refer to the picture 
attached in the other comments.
   
   Of course, if you think it is necessary to cover all tags here, I think it 
also makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #581: [FLINK-31936] Support setting scale up max factor

2023-05-09 Thread via GitHub


gyfora commented on PR #581:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/581#issuecomment-1540552443

   @mxm do you have any more concerns/comments? Or can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-09 Thread via GitHub


flinkbot commented on PR #22556:
URL: https://github.com/apache/flink/pull/22556#issuecomment-1540548453

   
   ## CI report:
   
   * e72addaa542c5ec6b8afd87536d56c415ea12db4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


reswqa commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188891342


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)
+.build()
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
DuplicatingOperator()))
+.addNonChainedOutputsCount(1)
+.build()
+.finish()
+.setTaskMetricGroup(taskMetricGroup)
+.build()) {
+Counter numRecordsInCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+Counter numRecordsOutCounter =
+
taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+final int numEvenRecords = 5;
+final int numOddRecords = 3;
+
+for (int x = 0; x < numEvenRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x));
+}
+
+for (int x = 0; x < numOddRecords; x++) {
+testHarness.processElement(new StreamRecord<>(2 * x + 1));
+}
+assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
+assertEquals(
+numOddRecords
++ (numOddRecords + numEvenRecords)
++ (numOddRecords + numEvenRecords) * 2,
+numRecordsOutCounter.getCount());

Review Comment:
   The topology graph of this test case is as follows:
   
![image](https://github.com/apache/flink/assets/19502505/cdae0246-da53-4860-b3c8-9f2d62dfe559)
   - The first operator does not have `NonChainedOutput`.
   - The second operator have two `RecordWriter With Odd Tag` as this 
code:`.addNonChainedOutputsCount(new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)`
   
   The assertion value(`numOddRecords + (numOddRecords + numEvenRecords) + 
(numOddRecords + numEvenRecords) * 2`) from the following three parts:
   
   - `numOddRecords`  from the second `OddEvenOperator`'s two 
`RecordWriterOutput With Odd Tag`. It is only calculated once here, which is 
guaranteed by the logic of `BroadcastingOutputCollector`.
   - `numOddRecords + numEvenRecords` from the second `OddEvenOperator`'s 
`RecordWriterOutput`.
   - `(numOddRecords + numEvenRecords) * 2` from the `DuplicatingOperator`
   
   It should be noted here that:
   - `OddEvenOperator` will send duplicated data to the side output and normal 
output. So we have the first two parts above.
   ```
public void processElement(StreamRecord element) {
   if (element.getValue() % 2 == 0) {
   output.collect(evenOutputTag, element);
   } else {
   output.collect(oddOutputTag, element);
   }
   output.collect(element);
}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [flink] zentol commented on a diff in pull request #21849: [FLINK-30596][Runtime/REST] Fix duplicate jobs when submitting with the same jobId

2023-05-09 Thread via GitHub


zentol commented on code in PR #21849:
URL: https://github.com/apache/flink/pull/21849#discussion_r1188871040


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1513,13 +1517,20 @@ private CompletableFuture waitForTerminatingJob(
 throwable));
 });
 
-return jobManagerTerminationFuture.thenAcceptAsync(
+// keep track of the job as outstanding, if not done
+if (!jobManagerTerminationFuture.isDone()) {
+submittedAndWaitingTerminationJobIDs.add(jobId);
+}

Review Comment:
   This doesn't seem safe; there's no guarantee the termination future doesn't 
complete a millisecond after the condition, so any submission with the same job 
ID could still fail.



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1513,13 +1517,20 @@ private CompletableFuture waitForTerminatingJob(
 throwable));
 });
 
-return jobManagerTerminationFuture.thenAcceptAsync(
+// keep track of the job as outstanding, if not done
+if (!jobManagerTerminationFuture.isDone()) {
+submittedAndWaitingTerminationJobIDs.add(jobId);
+}
+
+return FutureUtils.thenAcceptAsyncIfNotDone(

Review Comment:
   AFAICT this change along, without any of the other stuff, would already 
solve the issue in question.
   If no JM exists for this job then this runs synchronously in the main thread.



##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##
@@ -232,6 +233,27 @@ public void 
testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() thro
 assertDuplicateJobSubmission();
 }
 
+@Test
+public void testDuplicateJobSubmissionIsDetected() throws Exception {

Review Comment:
   Isn't this test just duplicating existing tests and not really targeting the 
case we're interested in? You want a test that actually delays the 
persistAndJob process.



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1513,13 +1517,20 @@ private CompletableFuture waitForTerminatingJob(
 throwable));
 });
 
-return jobManagerTerminationFuture.thenAcceptAsync(
+// keep track of the job as outstanding, if not done
+if (!jobManagerTerminationFuture.isDone()) {
+submittedAndWaitingTerminationJobIDs.add(jobId);
+}
+
+return FutureUtils.thenAcceptAsyncIfNotDone(
+jobManagerTerminationFuture,
+getMainThreadExecutor(),
 FunctionUtils.uncheckedConsumer(
 (ignored) -> {
+submittedAndWaitingTerminationJobIDs.remove(jobId);

Review Comment:
   This isnt run if the submission fails, leaking memory and breaking all 
subsequent job submissions attempts with a fixed job id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32030) SQL Client gateway mode should accept URLs

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32030:
---
Labels: pull-request-available  (was: )

> SQL Client gateway mode should accept URLs
> --
>
> Key: FLINK-32030
> URL: https://issues.apache.org/jira/browse/FLINK-32030
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the _--endpoint_ parameter has to be specified in the 
> _InetSocketAddress_  format, i.e. _hostname:port._ While this works fine for 
> basic use cases, it does not support the placement of the gateway behind a 
> proxy or using an Ingress for routing to a specific Flink cluster based on 
> the URL path.  I.e. it expects 
> _[some.hostname.com:9001|http://some.hostname.com:9001/]_  to directly serve 
> requests on _[some.hostname.com:9001/v1|http://some.hostname.com:9001/v1]_ . 
> Mapping to a non-root location, i.e. 
> _[some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1|http://some.hostname.com:9001/flink-clusters/sql-preview-cluster-1/v1]_
>   is not supported.
>  
> Since the client talks to the gateway via its REST endpoint, the right format 
> for the _--endpoint_  parameter is {_}URL{_}, not _InetSocketAddress_ .  
> The same _--endpoint_ parameter can be reused if the changes are implemented 
> in a backwards-compatible way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] afedulov opened a new pull request, #22556: [FLINK-32030][sql-client] Add URLs support for SQL Client gateway mode

2023-05-09 Thread via GitHub


afedulov opened a new pull request, #22556:
URL: https://github.com/apache/flink/pull/22556

   ## What is the purpose of the change
   
   This pull request adds support for URLs in SQL Client gateway mode. The 
changes are backwards compatible and still allow to pass `hostname:port` as the 
`--endpoint` parameter.
   https://issues.apache.org/jira/browse/FLINK-32030
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added integration tests using a URL as the new `--endpoint` parameter
 - Manually verified connectivity and interactions with a SQL Gateway that 
is exposed using a proxy,  and includes a URL path 
(`foo.com:8083/some-path/v1/info`)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - The documentation will be added upon general PR approval
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mohsenrezaeithe commented on pull request #21849: [FLINK-30596][Runtime/REST] Fix duplicate jobs when submitting with the same jobId

2023-05-09 Thread via GitHub


mohsenrezaeithe commented on PR #21849:
URL: https://github.com/apache/flink/pull/21849#issuecomment-1540497994

   @MartijnVisser this is relatively an important fix to the REST API. Could 
you please help me find other committers that may be available to take a look 
at the changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31734) Align behaviour of REST API / WEB UI feature flags

2023-05-09 Thread xiaochen zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720987#comment-17720987
 ] 

xiaochen zhou commented on FLINK-31734:
---

hi, I would like to give a try on this, can  I take this ticket?I will try my 
best to complete it
 

> Align behaviour of REST API / WEB UI feature flags
> --
>
> Key: FLINK-31734
> URL: https://issues.apache.org/jira/browse/FLINK-31734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration, Runtime / REST, Runtime / Web 
> Frontend
>Reporter: David Morávek
>Priority: Major
>
> Currently we have following three feature flags for the web UI and REST API:
> 1) web.submit.enabled
> 2) web.cancel.enabled
> 3) web.rescale.enabled
>  
> 2) and 3) only hide the web UI elements, while 1) also removes the REST API 
> endpoint; We should introduce equivalent options for the REST API 
> (rest.xxx.enabled), which would give the user flexibility to choose whether 
> both REST API and WEB UI should be disabled for a given feature.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2023-05-09 Thread Jayme Howard (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720981#comment-17720981
 ] 

Jayme Howard commented on FLINK-31860:
--

I *think* that only didn't work because of per-namespace permissions.  If 
cluster-wide permissions are in place, I would expect this to still work?

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Blocker
>  Labels: pull-request-available
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> 

[GitHub] [flink] flinkbot commented on pull request #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-09 Thread via GitHub


flinkbot commented on PR #22555:
URL: https://github.com/apache/flink/pull/22555#issuecomment-1540449343

   
   ## CI report:
   
   * 49c1f049710af5f67d78a5fa3871c3ace831c435 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2023-05-09 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720973#comment-17720973
 ] 

Gyula Fora commented on FLINK-31860:


[~isugimpy] we can definitely add this improvement but it didn't fix it for me 
completely as I was getting an error from the Java Operator SDK itself as it 
cannot finish deletion of the CR itself (cannot remove the finalizer)

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = 

[jira] [Updated] (FLINK-31706) The default source parallelism should be the same as execution's default parallelism under adaptive batch scheduler

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31706:
---
Labels: pull-request-available  (was: )

> The default source parallelism should be the same as execution's default 
> parallelism under adaptive batch scheduler
> ---
>
> Key: FLINK-31706
> URL: https://issues.apache.org/jira/browse/FLINK-31706
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently, the sources need to set 
> {{execution.batch.adaptive.auto-parallelism.default-source-parallelism }} in 
> the adaptive batch scheduler mode, otherwise, the source parallelism is only 
> 1 by default. A better solution might be set as the default execution 
> parallelism if no user configured. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31860) FlinkDeployments never finalize when namespace is deleted

2023-05-09 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-31860:
--

Assignee: Jayme Howard  (was: Gyula Fora)

> FlinkDeployments never finalize when namespace is deleted
> -
>
> Key: FLINK-31860
> URL: https://issues.apache.org/jira/browse/FLINK-31860
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>Reporter: Jayme Howard
>Assignee: Jayme Howard
>Priority: Blocker
>  Labels: pull-request-available
>
> This appears to be a pretty straightforward issue, but I don't know the 
> codebase well enough to propose a fix.  When a FlinkDeployment is present in 
> a namespace, and the namespace is deleted, the FlinkDeployment never 
> reconciles and fails to complete its finalizer.  This leads to the namespace 
> being blocked from deletion indefinitely, requiring manual manipulation to 
> remove the finalizer on the FlinkDeployment.
>  
> Namespace conditions:
> {code:java}
> conditions:
> - lastTransitionTime: '2023-04-18T22:17:48Z'
>   message: All resources successfully discovered
>   reason: ResourcesDiscovered
>   status: 'False'
>   type: NamespaceDeletionDiscoveryFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All legacy kube types successfully parsed
>   reason: ParsedGroupVersions
>   status: 'False'
>   type: NamespaceDeletionGroupVersionParsingFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: All content successfully deleted, may be waiting on finalization
>   reason: ContentDeleted
>   status: 'False'
>   type: NamespaceDeletionContentFailure
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some resources are remaining: flinkdeployments.flink.apache.org 
> has 2
> resource instances'
>   reason: SomeResourcesRemain
>   status: 'True'
>   type: NamespaceContentRemaining
> - lastTransitionTime: '2023-03-23T18:27:37Z'
>   message: 'Some content in the namespace has finalizers remaining: 
> flinkdeployments.flink.apache.org/finalizer
> in 2 resource instances'
>   reason: SomeFinalizersRemain
>   status: 'True'
>   type: NamespaceFinalizersRemaining
> phase: Terminating {code}
> FlinkDeployment example (some fields redacted):
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   creationTimestamp: '2023-03-23T18:27:02Z'
>   deletionGracePeriodSeconds: 0
>   deletionTimestamp: '2023-03-23T18:27:35Z'
>   finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>   generation: 3
>   name: 
>   namespace: 
>   resourceVersion: '10565277081'
>   uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
> spec:
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: '2'
>   flinkVersion: v1_16
>   image: 
>   job:
>     args: []
>     entryClass: 
>     jarURI: 
>     parallelism: 2
>     state: running
>     upgradeMode: stateless
>   jobManager:
>     replicas: 1
>     resource:
>       cpu: 1
>       memory: 2048m
>   logConfiguration:
>     log4j-console.properties: '# This affects logging for both user code and 
> Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
> ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender 
>      # Uncomment this if you want to _only_ change Flink''s logging      
> #logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
> The following lines keep the log level of common libraries/connectors on      
> # log level INFO. The root logger does not override this. You have to 
> manually      # change the log levels here.      logger.akka.name = akka      
> logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
> logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
> logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper  
>     logger.zookeeper.level = INFO      # Log all infos to the console      
> appender.console.name = ConsoleAppender      appender.console.type = CONSOLE  
>     appender.console.layout.type = PatternLayout      
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      # Log all infos in the given rolling file      
> appender.rolling.name = RollingFileAppender      appender.rolling.type = 
> RollingFile      appender.rolling.append = false      
> appender.rolling.fileName = ${sys:log.file}      appender.rolling.filePattern 
> = ${sys:log.file}.%i      appender.rolling.layout.type = PatternLayout      
> appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
>       - %m%n      appender.rolling.policies.type = Policies      
> 

[GitHub] [flink] clownxc opened a new pull request, #22555: [FLINK-31706] [runtime] The default source parallelism should be the same as ex…

2023-05-09 Thread via GitHub


clownxc opened a new pull request, #22555:
URL: https://github.com/apache/flink/pull/22555

   
   ## What is the purpose of the change
   
   Currently, the sources need to set  
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` in the 
adaptive batch scheduler mode, otherwise, the source parallelism is only 1 by 
default. A better solution might be set as the default execution parallelism if 
no user configured.
   
   ## Brief change log
   
   Modified the default value of source-parallelism to be more reasonable.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`AdaptiveBatchSchedulerTest` 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #587: [FLINK-31717] Fix unit tests using local kube config

2023-05-09 Thread via GitHub


gyfora commented on PR #587:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/587#issuecomment-1540426062

   I will merge this once I have cut the 1.5 release branch hopefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31963) java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned checkpoints

2023-05-09 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720610#comment-17720610
 ] 

Piotr Nowojski edited comment on FLINK-31963 at 5/9/23 3:23 PM:


So far I was not able to reproduce this :(

Additionally to what [~srichter] asked. [~tanee.kim], would it be possible for 
you to provide the checkpoint files from when the failure was happening, so 
that we could reproduce it more easily? 

Secondly, a random guess. Can someone verify if setting 
{{execution.checkpointing.unaligned.max-subtasks-per-channel-state-file}} to 1 
stops this issue from reoccurring? [1]

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-unaligned-max-subtasks-per-channel-state


was (Author: pnowojski):
Additionally to what [~srichter] asked. [~tanee.kim], would it be possible for 
you to provide the checkpoint files from when the failure was happening, so 
that we could reproduce it more easily? So far I was not able to reproduce this 
:(

> java.lang.ArrayIndexOutOfBoundsException when scaling down with unaligned 
> checkpoints
> -
>
> Key: FLINK-31963
> URL: https://issues.apache.org/jira/browse/FLINK-31963
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0
> Environment: Flink: 1.17.0
> FKO: 1.4.0
> StateBackend: RocksDB(Genetic Incremental Checkpoint & Unaligned Checkpoint 
> enabled)
>Reporter: Tan Kim
>Priority: Critical
>  Labels: stability
> Attachments: image-2023-04-29-02-49-05-607.png, jobmanager_error.txt, 
> taskmanager_error.txt
>
>
> I'm testing Autoscaler through Kubernetes Operator and I'm facing the 
> following issue.
> As you know, when a job is scaled down through the autoscaler, the job 
> manager and task manager go down and then back up again.
> When this happens, an index out of bounds exception is thrown and the state 
> is not restored from a checkpoint.
> [~gyfora] told me via the Flink Slack troubleshooting channel that this is 
> likely an issue with Unaligned Checkpoint and not an issue with the 
> autoscaler, but I'm opening a ticket with Gyula for more clarification.
> Please see the attached JM and TM error logs.
> Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32038) OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-05-09 Thread Matt Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720966#comment-17720966
 ] 

Matt Wang commented on FLINK-32038:
---

[~pritam.agarwala]  Do you mean you want use the configuration of 
`enable.auto.commit` to decide whether to do offset commit when 
`{{{}consumer.setCommitOffsetsOnCheckpoints`{}}} set to false?

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> ---
>
> Key: FLINK-32038
> URL: https://issues.apache.org/jira/browse/FLINK-32038
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.14.6
>Reporter: Pritam Agarwala
>Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
> /**
>  * Determine the offset commit mode using several configuration values.
>  *
>  * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>  * properties.
>  * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>  * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>  * @return the offset commit mode to use, based on the configuration 
> values.
>  */
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether  
>  committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is 
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
> }
> }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #587: [FLINK-31717] Fix unit tests using local kube config

2023-05-09 Thread via GitHub


gyfora commented on PR #587:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/587#issuecomment-1540352291

   looking at the test now @mateczagany I agree that this is 99% covered by the 
existing e2e-s. I suggest we go ahead with your proposed changes :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720964#comment-17720964
 ] 

Gyula Fora commented on FLINK-32041:


A current workaround for this would be to add the operator's own namespace to 
the list of watched namespaces. That would set up the roles correctly in that 
namespace as well :) 

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Assignee: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-32041:
--

Assignee: Andrew Otto

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Assignee: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pnowojski commented on pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


pnowojski commented on PR #22432:
URL: https://github.com/apache/flink/pull/22432#issuecomment-1540337849

   One more thing, have you maybe manually confirmed that the bug fix is 
working after all of the changes in this PR? Just to double check your unit 
tests are indeed testing the right thing :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #22546: [FLINK-32032] Upgrade to flink-shaded 17.0

2023-05-09 Thread via GitHub


MartijnVisser commented on PR #22546:
URL: https://github.com/apache/flink/pull/22546#issuecomment-1540301776

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] echauchot opened a new pull request, #646: Update javadoc urls to 1.16 in the last blog article

2023-05-09 Thread via GitHub


echauchot opened a new pull request, #646:
URL: https://github.com/apache/flink-web/pull/646

   @zentol I figured out that for some reason some javadoc urls were pointing 
to flink 1.12 in the last article, updating to 1.16


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Andrew Otto (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Otto updated FLINK-32041:

Summary: flink-kubernetes-operator RoleBinding for Leases not created in 
correct namespace when using watchNamespaces  (was: flink-kubernetes-operator 
RoleBinding for Leases not created in correct namespace when using 
watchedNamespaces)

> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  a RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using watchNamespaces, the RBAC rules are created at the 
> k8s cluster level scope, giving the flink-operator ServiceAccount the ability 
> to manage all needed k8s resources for all namespaces.
> However, when using watchNamespaces, RBAC rules are only created in the 
> watchNamepaces.  For most rules, this is correct, as the operator needs to 
> manage resources like Flink pods and deployments in the watchNamespaces.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchNamespaces

2023-05-09 Thread Andrew Otto (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Otto updated FLINK-32041:

Description: 
When enabling [HA for 
flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
 RBAC rules must be created to allow the flink-operator to manage k8s Lease 
resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are created 
at the k8s cluster level scope, giving the flink-operator ServiceAccount the 
ability to manage all needed k8s resources for all namespaces.

However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in the 
{{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator needs 
to manage resources like Flink pods and deployments in the 
{{{}watchNamespaces{}}}.  

However, For flink-kubernetes-operator HA, the Lease resource is managed in the 
same namespace in which the operator is deployed.  

The Helm chart should be fixed so that the proper RBAC rules for Leases are 
created to allow the operator's ServiceAccount in the operator's namespace.

Mailing list discussion 
[here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]

  was:
When enabling [HA for 
flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
 a RBAC rules must be created to allow the flink-operator to manage k8s Lease 
resources.  When not using watchNamespaces, the RBAC rules are created at the 
k8s cluster level scope, giving the flink-operator ServiceAccount the ability 
to manage all needed k8s resources for all namespaces.

However, when using watchNamespaces, RBAC rules are only created in the 
watchNamepaces.  For most rules, this is correct, as the operator needs to 
manage resources like Flink pods and deployments in the watchNamespaces.  

However, For flink-kubernetes-operator HA, the Lease resource is managed in the 
same namespace in which the operator is deployed.  

The Helm chart should be fixed so that the proper RBAC rules for Leases are 
created to allow the operator's ServiceAccount in the operator's namespace.

Mailing list discussion 
[here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]


> flink-kubernetes-operator RoleBinding for Leases not created in correct 
> namespace when using watchNamespaces
> 
>
> Key: FLINK-32041
> URL: https://issues.apache.org/jira/browse/FLINK-32041
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Andrew Otto
>Priority: Major
>
> When enabling [HA for 
> flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
>  RBAC rules must be created to allow the flink-operator to manage k8s Lease 
> resources.  When not using {{{}watchNamespaces{}}}, the RBAC rules are 
> created at the k8s cluster level scope, giving the flink-operator 
> ServiceAccount the ability to manage all needed k8s resources for all 
> namespaces.
> However, when using {{{}watchNamespaces{}}}, RBAC rules are only created in 
> the {{{}watchNamepaces{}}}.  For most rules, this is correct, as the operator 
> needs to manage resources like Flink pods and deployments in the 
> {{{}watchNamespaces{}}}.  
> However, For flink-kubernetes-operator HA, the Lease resource is managed in 
> the same namespace in which the operator is deployed.  
> The Helm chart should be fixed so that the proper RBAC rules for Leases are 
> created to allow the operator's ServiceAccount in the operator's namespace.
> Mailing list discussion 
> [here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32041) flink-kubernetes-operator RoleBinding for Leases not created in correct namespace when using watchedNamespaces

2023-05-09 Thread Andrew Otto (Jira)
Andrew Otto created FLINK-32041:
---

 Summary: flink-kubernetes-operator RoleBinding for Leases not 
created in correct namespace when using watchedNamespaces
 Key: FLINK-32041
 URL: https://issues.apache.org/jira/browse/FLINK-32041
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.4.0
Reporter: Andrew Otto


When enabling [HA for 
flink-kubernetes-operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability]
 a RBAC rules must be created to allow the flink-operator to manage k8s Lease 
resources.  When not using watchNamespaces, the RBAC rules are created at the 
k8s cluster level scope, giving the flink-operator ServiceAccount the ability 
to manage all needed k8s resources for all namespaces.

However, when using watchNamespaces, RBAC rules are only created in the 
watchNamepaces.  For most rules, this is correct, as the operator needs to 
manage resources like Flink pods and deployments in the watchNamespaces.  

However, For flink-kubernetes-operator HA, the Lease resource is managed in the 
same namespace in which the operator is deployed.  

The Helm chart should be fixed so that the proper RBAC rules for Leases are 
created to allow the operator's ServiceAccount in the operator's namespace.

Mailing list discussion 
[here.|https://lists.apache.org/thread/yq89jm0szkcodfocm5x7vqnqdmh0h1l0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mateczagany commented on pull request #587: [FLINK-31717] Fix unit tests using local kube config

2023-05-09 Thread via GitHub


mateczagany commented on PR #587:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/587#issuecomment-1540221341

   I thought this test case was covered by the end-to-end tests.
   
   I wanted to make sure that the user won't accidentally make changes to the 
Kubernetes cluster that is defined in `~/.kube/config` by simply running the 
tests, and this test case does exactly that. 
   
   I know that it would require to specifically set `it.skip` to false, but 
removing this test allowed me to remove minikube from the CI step to make sure 
all new tests added in the future will not accidentally make changes to the 
users Kubernetes server.
   
   I have some other solutions in mind if the current solution does not seem 
fine:
   - Restore `FlinkOperatorITCase.java` and run this test in the e2e job of the 
CI workflow
   - Restore `FlinkOperatorITCase.java` and revert changes to `ci.yml` but then 
we risk adding tests in the future that make changes to the users Kubernetes 
server


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #22432: [FLINK-18808][runtime] Include side outputs in numRecordsOut metric

2023-05-09 Thread via GitHub


pnowojski commented on code in PR #22432:
URL: https://github.com/apache/flink/pull/22432#discussion_r1188607839


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * This is a wrapper for outputs to check whether the collected record has 
been emitted to a
+ * downstream subtask or to a chained operator.
+ */
+@Internal
+public interface OutputWithChainingCheck extends 
WatermarkGaugeExposingOutput {
+/**
+ * @return true if the collected record has been emitted to a downstream 
subtask. Otherwise,
+ * false.
+ */
+boolean collectAndCheckIfCountNeeded(OUT record);

Review Comment:
   nitty nit: rename to `collectAndCheckIfChained`?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+.addAdditionalOutput(partitionWriters)
+.setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
+.setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
+.addNonChainedOutputsCount(
+new OutputTag<>("odd", 
BasicTypeInfo.INT_TYPE_INFO), 2)
+.addNonChainedOutputsCount(1)

Review Comment:
   Why we need to pass a tag for one but not for the other? 樂 



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java:
##
@@ -996,6 +1002,88 @@ public void testCanEmitBatchOfRecords() throws Exception {
 }
 }
 
+@Test
+public void testTaskSideOutputStatistics() throws Exception {
+TaskMetricGroup taskMetricGroup =
+UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+
+ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[3];
+for (int i = 0; i < partitionWriters.length; ++i) {
+partitionWriters[i] =
+new RecordOrEventCollectingResultPartitionWriter<>(
+new ArrayDeque<>(),
+new StreamElementSerializer<>(
+
BasicTypeInfo.INT_TYPE_INFO.createSerializer(
+new ExecutionConfig(;
+partitionWriters[i].setup();
+}
+
+try (StreamTaskMailboxTestHarness testHarness =
+new StreamTaskMailboxTestHarnessBuilder<>(
+OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+.addInput(BasicTypeInfo.INT_TYPE_INFO)
+

[GitHub] [flink] flinkbot commented on pull request #22554: [BP-1.17][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


flinkbot commented on PR #22554:
URL: https://github.com/apache/flink/pull/22554#issuecomment-1540184679

   
   ## CI report:
   
   * d767ea7cb668fe42c34dbd02c42b66df10cc8788 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22553: [BP-1.16][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


flinkbot commented on PR #22553:
URL: https://github.com/apache/flink/pull/22553#issuecomment-1540184372

   
   ## CI report:
   
   * 7bcad268c59287f22b05b5e6c8cb5121323169e9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #22554: [BP-1.17][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


reswqa opened a new pull request, #22554:
URL: https://github.com/apache/flink/pull/22554

   Backport FLINK-32027 to release-1.17.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #22553: [BP-1.16][FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


reswqa opened a new pull request, #22553:
URL: https://github.com/apache/flink/pull/22553

   Backport FLINK-32027 to release-1.16.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tom-Goong closed pull request #8193: [hotfix] load globalConfiguration from configurationDir

2023-05-09 Thread via GitHub


Tom-Goong closed pull request #8193: [hotfix] load globalConfiguration from 
configurationDir
URL: https://github.com/apache/flink/pull/8193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32027) Batch jobs could hang at shuffle phase when max parallelism is really large

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32027:
---
Labels: pull-request-available  (was: )

> Batch jobs could hang at shuffle phase when max parallelism is really large
> ---
>
> Key: FLINK-32027
> URL: https://issues.apache.org/jira/browse/FLINK-32027
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0, 1.16.1
>Reporter: Yun Tang
>Assignee: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.17.1
>
> Attachments: image-2023-05-08-11-12-58-361.png
>
>
> In batch stream mode with adaptive batch schedule mode, If we set the max 
> parallelism large as 32768 (pipeline.max-parallelism), the job could hang at 
> the shuffle phase:
> It would hang for a long time and show "No bytes sent":
>  !image-2023-05-08-11-12-58-361.png! 
> After some time to debug, we can see the downstream operator did not receive 
> the end-of-partition event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa closed pull request #22549: [FLINK-32027] Fix the potential concurrent reading bug of index file for SortMergeShuffle.

2023-05-09 Thread via GitHub


reswqa closed pull request #22549: [FLINK-32027] Fix the potential concurrent 
reading bug of index file for SortMergeShuffle.
URL: https://github.com/apache/flink/pull/22549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22548: [FLINK-32039][test] Adds graceful shutdown to TestExecutorExtension and TestExecutorResource

2023-05-09 Thread via GitHub


zentol commented on code in PR #22548:
URL: https://github.com/apache/flink/pull/22548#discussion_r1188563452


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java:
##
@@ -47,8 +58,26 @@ public T getExecutor() {
 
 @Override
 public void afterAll(ExtensionContext context) throws Exception {
+gracefulShutdown(executorService, LOG);
+}
+
+static void gracefulShutdown(@Nullable ExecutorService executorService, 
Logger logger) {

Review Comment:
   I'm a bit worried that this might increase the duration of tests because now 
affected tests sits idle for 10 seconds.
   Why not just call shutdownNow immediately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mbalassi merged pull request #588: [hotfix] Update CRD compat check to 1.4.0

2023-05-09 Thread via GitHub


mbalassi merged PR #588:
URL: https://github.com/apache/flink-kubernetes-operator/pull/588


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25909) Move HBase token obtain functionality into HBaseDelegationTokenProvider

2023-05-09 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720918#comment-17720918
 ] 

Gabor Somogyi commented on FLINK-25909:
---

1. no in order to keep backward compatibility
2. level can be lowered from info to debug

> Move HBase token obtain functionality into HBaseDelegationTokenProvider
> ---
>
> Key: FLINK-25909
> URL: https://issues.apache.org/jira/browse/FLINK-25909
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.15.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #21349: [FLINK-28203] Support Maven 3.3+

2023-05-09 Thread via GitHub


zentol commented on code in PR #21349:
URL: https://github.com/apache/flink/pull/21349#discussion_r1188557290


##
flink-python/pom.xml:
##


Review Comment:
   We don't _need_ to generally speaking. IIRC the s3 filesystem is one of 
those hyper-problematic modules with loads of dependencies being associated 
with test dependencies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-25909) Move HBase token obtain functionality into HBaseDelegationTokenProvider

2023-05-09 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720918#comment-17720918
 ] 

Gabor Somogyi edited comment on FLINK-25909 at 5/9/23 12:56 PM:


1. no, in order to keep backward compatibility
2. level can be lowered from info to debug


was (Author: gaborgsomogyi):
1. no in order to keep backward compatibility
2. level can be lowered from info to debug

> Move HBase token obtain functionality into HBaseDelegationTokenProvider
> ---
>
> Key: FLINK-25909
> URL: https://issues.apache.org/jira/browse/FLINK-25909
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.15.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] reswqa merged pull request #72: [hotfix][BP-4.1] Python connector download link should refer to the url defined in externalized repository

2023-05-09 Thread via GitHub


reswqa merged PR #72:
URL: https://github.com/apache/flink-connector-aws/pull/72


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] reswqa merged pull request #71: [hotfix] Python connector download link should refer to the url defined in externalized repository

2023-05-09 Thread via GitHub


reswqa merged PR #71:
URL: https://github.com/apache/flink-connector-aws/pull/71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22552: [FLINK-30585][flamegraph] Collect the flame graph of a single subtask instead of all subtasks when viewing the flame graph of a single subta

2023-05-09 Thread via GitHub


flinkbot commented on PR #22552:
URL: https://github.com/apache/flink/pull/22552#issuecomment-1540053178

   
   ## CI report:
   
   * 5da82557df67d2e215b006b4fd344ddc5c516c72 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30585) Improve flame graph performance at subtask level

2023-05-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30585:
---
Labels: pull-request-available  (was: )

> Improve flame graph performance at subtask level
> 
>
> Key: FLINK-30585
> URL: https://issues.apache.org/jira/browse/FLINK-30585
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task, Runtime / Web Frontend
>Affects Versions: 1.17.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> After FLINK-30185 , we can view the flame graph of subtask level. However, it 
> always collects flame graphs for all subtasks.
> We should collect the flame graph of single subtask instead of all subtasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui opened a new pull request, #22552: [FLINK-30585][flamegraph] Collect the flame graph of a single subtask instead of all subtasks when viewing the flame graph of a single su

2023-05-09 Thread via GitHub


1996fanrui opened a new pull request, #22552:
URL: https://github.com/apache/flink/pull/22552

   ## What is the purpose of the change
   
   After [FLINK-30185](https://issues.apache.org/jira/browse/FLINK-30185) , we 
can view the flame graph of subtask level. However, it always collects flame 
graphs for all subtasks.
   
   We should collect the flame graph of single subtask instead of all subtasks.
   
   ## Brief change log
   
   [FLINK-30585][flamegraph] Collect the flame graph of a single subtask 
instead of all subtasks when viewing the flame graph of a single subtask
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   It's still developing.
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector:no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented?  not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] reswqa commented on pull request #71: [hotfix] Python connector download link should refer to the url defined in externalized repository

2023-05-09 Thread via GitHub


reswqa commented on PR #71:
URL: 
https://github.com/apache/flink-connector-aws/pull/71#issuecomment-1540024159

   > Right now thought it says ["Only available for stable 
versions."](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)
 so I think it is ok
   
   Yes, make sense.
   
   Thanks for the quick review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >