Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-14 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1452023246


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
-private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
-var labels = flinkSessionJob.getMetadata().getLabels();
-if (labels == null) {
-labels = new HashMap<>();
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+for (FlinkResourceMutator mutator : mutators) {
+FlinkSessionJob flinkSessionJob =
+mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
+sessionJob = flinkSessionJob;

Review Comment:
   Simplify to : 
   ```
   sessionJob =  mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
   ```



##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 

[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The following issue encounters with flink-kinesis-connector v4.2.0, Flink 1.17, 
it's working properly with kinesis connector v4.1.0 (I have not tested version 
pre v4.1.0).

The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

and a minimal job definition:

{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);

tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
}
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the kinesis connector, the current separation adds unnecessary hassle to use 
the connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

and a minimal job definition:

{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);

tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
}
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}

The fix is to 

[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806669#comment-17806669
 ] 

Khanh Vu commented on FLINK-34076:
--

Hi [~jiabao.sun],
Yes, if I have `flink-connector-base` in the dependency list, it will run 
properly (it's the fix I mentioned), but if I leave the base out, it's failing. 
Before the aforementioned commit, I just need to have `flink-connector-kinesis` 
in the list (not along with the `flink-connector-base`). 

It's actually not correct when the connector depends on `flink-connector-base` 
to execute but does not have it as (transitive) dependency.

> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> and a minimal job definition:
> {code:java}
>   public static void main(String[] args) throws Exception {
>   // create data stream environment
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>   Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>   TableDescriptor descriptor =
>   TableDescriptor.forConnector("kinesis")
>   .schema(a)
>   .format("json")
>   .build();
>   tEnv.createTemporaryTable("sinkTable", descriptor);
>   tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>   }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>   at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>   at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>   ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>   
>   org.apache.flink
>   flink-connector-kinesis
>   ${flink.connector.kinesis.version}
>   
>   
>   org.apache.flink
>   flink-connector-base
>   ${flink.version}
>   provided
>   
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]

2024-01-14 Thread via GitHub


gyfora commented on PR #23930:
URL: https://github.com/apache/flink/pull/23930#issuecomment-1891493012

   Looks good @gaborgsomogyi  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806664#comment-17806664
 ] 

Matthias Pohl commented on FLINK-33998:
---

I couldn't find anything that sounds related to your issue in the release notes 
of [Flink 
1.14.0|https://nightlies.apache.org/flink/flink-docs-release-1.18/release-notes/flink-1.14/#runtime--coordination].
 A more detailed overview of the changes is possible by browsing through all 
the changes of the [individual 1.14.x 
releases|https://issues.apache.org/jira/projects/FLINK?selectedItem=com.atlassian.jira.jira-projects-plugin:release-page=released=1.14].
 But that's quite tedious.

> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 
> 05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to 
> kube-apiserver. But there is no more connection error within a few seconds. I 
> guess the connection recovered by retry.
> However, one of the JM (the 2nd one in the attached screen shot) reported 
> "DefaultDispatcherRunner was revoked the leadership" error after the leader 
> election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then restarted 
> itself. While the other JM was still running normally.
> From kube-apiserver audit logs, the normal JM was able to renew leader lease 
> after the interruption. But there is no any lease renew request from the 
> failed JM until it restarted.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452012771


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##
@@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) 
{}
  * Get the availability and backlog of the view. The availability 
represents if the view is
  * ready to get buffer from it. The backlog represents the number of 
available data buffers.
  *
- * @param numCreditsAvailable the available credits for this {@link 
ResultSubpartitionView}.
+ * @param isCreditAvailable the availability of credits for this {@link 
ResultSubpartitionView}.
  * @return availability and backlog.
  */
-AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
+AvailabilityWithBacklog getAvailabilityAndBacklog(boolean 
isCreditAvailable);

Review Comment:
   This is a must-have change, because otherwise `UnionResultSubpartitionView` 
would need to determine how to distribute credits to child views, which is 
unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


yunfengzhou-hub commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1452009484


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   In future we may need to dynamically assign subpartitions to an input 
channel during runtime, in which case the index of the subpartitions may not be 
adjacent to each other. The IndexSet naming could offer a abstraction that is 
general enough for these future uses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34078:


Assignee: Jinzhong Li

> Move InternalKeyContext classes from o.a.f.runtime.state.heap to 
> o.a.f.runtime.state package
> 
>
> Key: FLINK-34078
> URL: https://issues.apache.org/jira/browse/FLINK-34078
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Minor
> Attachments: image-2024-01-15-12-57-12-667.png
>
>
> h3. Motication:
> When Rocksdb statebackend throws a keyGroup check illegal exception, 
> the exception stack contains the heap stateBackend scoped class, which looks 
> so strange to user.
> !image-2024-01-15-12-57-12-667.png|width=555,height=68!
> h3. Proposed changes:
> InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
> backends (heap/rocksdb/changelog), they should be moved from 
> org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
> package.
> h3. Compatibility:
> InternalKeyContext is annotated with @Internal, so this change has no 
> compatibility issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34081) Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value)

2024-01-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-34081:

Description: 
FLINK-34080 deprecate some methods of Configuration, we should refactor all 
callers of deprecated methods to use the recommended method.

 

> Refactor all callers that using the public Xxx getXxx(ConfigOption 
> configOption)  and public void setXxx(ConfigOption key, Xxx value) 
> 
>
> Key: FLINK-34081
> URL: https://issues.apache.org/jira/browse/FLINK-34081
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> FLINK-34080 deprecate some methods of Configuration, we should refactor all 
> callers of deprecated methods to use the recommended method.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806660#comment-17806660
 ] 

Hangxiang Yu commented on FLINK-34078:
--

Thanks for reporting this.

It makes sense to move it to the outer package.

Already assigned to you, please go ahead.

> Move InternalKeyContext classes from o.a.f.runtime.state.heap to 
> o.a.f.runtime.state package
> 
>
> Key: FLINK-34078
> URL: https://issues.apache.org/jira/browse/FLINK-34078
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Minor
> Attachments: image-2024-01-15-12-57-12-667.png
>
>
> h3. Motication:
> When Rocksdb statebackend throws a keyGroup check illegal exception, 
> the exception stack contains the heap stateBackend scoped class, which looks 
> so strange to user.
> !image-2024-01-15-12-57-12-667.png|width=555,height=68!
> h3. Proposed changes:
> InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
> backends (heap/rocksdb/changelog), they should be moved from 
> org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
> package.
> h3. Compatibility:
> InternalKeyContext is annotated with @Internal, so this change has no 
> compatibility issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants

2024-01-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34083:
---

Assignee: Xuannan Su

> Deprecate string configuration keys and unused constants in ConfigConstants
> ---
>
> Key: FLINK-34083
> URL: https://issues.apache.org/jira/browse/FLINK-34083
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
> Fix For: 1.19.0
>
>
> * Update ConfigConstants.java to deprecate and replace string configuration 
> keys
>  * Mark unused constants in ConfigConstants.java as deprecated



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat

2024-01-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34084:
---

Assignee: Xuannan Su

> Deprecate unused configuration in BinaryInput/OutputFormat and 
> FileInput/OutputFormat
> -
>
> Key: FLINK-34084
> URL: https://issues.apache.org/jira/browse/FLINK-34084
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
> Fix For: 1.19.0
>
>
> Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, 
> and BinaryOutputFormat.java to deprecate unused string configuration keys.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-34050:


Assignee: Jinzhong Li

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0

2024-01-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34085:
---

Assignee: Xuannan Su

> Remove deprecated string configuration keys in Flink 2.0
> 
>
> Key: FLINK-34085
> URL: https://issues.apache.org/jira/browse/FLINK-34085
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806659#comment-17806659
 ] 

Hangxiang Yu commented on FLINK-34050:
--

[~lijinzhong] Yeah, I think this benchmark result should be enough.

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]

2024-01-14 Thread via GitHub


LadyForest commented on code in PR #23505:
URL: https://github.com/apache/flink/pull/23505#discussion_r1448461452


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##
@@ -234,11 +238,71 @@ object WindowUtil {
 val step = getOperandAsLong(windowCall.operands(1))
 val maxSize = getOperandAsLong(windowCall.operands(2))
 new CumulativeWindowSpec(Duration.ofMillis(maxSize), 
Duration.ofMillis(step), offset)
+  case FlinkSqlOperatorTable.SESSION =>
+val gap = getOperandAsLong(windowCall.operands(1))
+val partitionKeys =
+  exploreSessionWindowPartitionKeys(scanInput)
+new SessionWindowSpec(Duration.ofMillis(gap), partitionKeys)
 }
 
 new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, 
timeIndex)
   }
 
+  /**
+   * If the session window tvf has partition keys, the whole tree is like:
+   *
+   * {{{

Review Comment:
   Nit
   ```scala
   {{{
  *  TableFunctionScan
  *  |
  *  [Project or Calc]
  *  |
  *   Exchange
  * }}}
   ```



##
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##
@@ -1432,9 +1433,15 @@ private void substituteSubQuery(Blackboard bb, SubQuery 
subQuery) {
 bb.cursors.add(converted.r);
 return;
 case SET_SEMANTICS_TABLE:
-if (!config.isExpand()) {
-return;
-}
+// - FLINK MODIFICATION BEGIN -
+// Currently, Flink will not distinguish tvf between SET 
semantics and ROW
+// semantics.
+// And in Flink, only session window tvf need to support SET 
semantics. It will
+// always be expanded.
+//  if (!config.isExpand()) {
+//  return;
+//  }

Review Comment:
   Can we defer the sub-query rewrite to the physical phase?



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java:
##
@@ -144,7 +145,8 @@ public RelNode visit(RelNode node) {
 || node instanceof FlinkLogicalSort
 || node instanceof FlinkLogicalOverAggregate
 || node instanceof FlinkLogicalExpand
-|| node instanceof FlinkLogicalScriptTransform) {
+|| node instanceof FlinkLogicalScriptTransform
+|| node instanceof FlinkLogicalExchange) {

Review Comment:
   I think we don't need to involve `FlinkLogicalExchange` here if we can defer 
the sub-query rewrite to the physical phase



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java:
##
@@ -249,4 +263,53 @@ private GeneratedNamespaceAggsHandleFunction 
createAggsHandler(
 sliceAssigner,
 shiftTimeZone);
 }
+
+/**
+ * Currently, the operator of WindowAggregate does not support Session 
Window and it needs to
+ * fall back to the legacy GroupWindowAggregate.
+ */
+private boolean shouldFallbackToGroupWindowAgg(WindowSpec windowSpec) {
+return windowSpec instanceof SessionWindowSpec;
+}
+
+private Transformation fallbackToGroupWindowAggregate(
+PlannerBase planner, ExecNodeConfig config) {
+Preconditions.checkState(windowing.getWindow() instanceof 
SessionWindowSpec);
+
+if (windowing instanceof TimeAttributeWindowingStrategy) {
+LogicalType timeAttributeType = windowing.getTimeAttributeType();
+LogicalWindow logicalWindow =
+new SessionGroupWindow(
+new WindowReference("w$", timeAttributeType),
+new FieldReferenceExpression(
+// mock an empty time field name here
+"",
+
fromLogicalTypeToDataType(timeAttributeType),
+0,
+((TimeAttributeWindowingStrategy) 
windowing)
+.getTimeAttributeIndex()),
+intervalOfMillis(
+((SessionWindowSpec) windowing.getWindow())
+.getGap()
+.toMillis()));
+
+StreamExecGroupWindowAggregate groupWindowAggregate =
+new StreamExecGroupWindowAggregate(
+planner.getTableConfig(),
+grouping,
+aggCalls,
+logicalWindow,
+

[jira] [Created] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34085:
--

 Summary: Remove deprecated string configuration keys in Flink 2.0
 Key: FLINK-34085
 URL: https://issues.apache.org/jira/browse/FLINK-34085
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34084:
--

 Summary: Deprecate unused configuration in 
BinaryInput/OutputFormat and FileInput/OutputFormat
 Key: FLINK-34084
 URL: https://issues.apache.org/jira/browse/FLINK-34084
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and 
BinaryOutputFormat.java to deprecate unused string configuration keys.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2024-01-14 Thread via GitHub


xishuaidelin commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   Hi xuyang, thanks for your comments. It considers both the key and the 
corresponding value in the comparison. Therefore, I don't foresee any issues 
arising during the merge stage. However, the issue you mentioned could 
potentially occur at the local stage, such as within the retract function. I 
would fix it in retract function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34083:
--

 Summary: Deprecate string configuration keys and unused constants 
in ConfigConstants
 Key: FLINK-34083
 URL: https://issues.apache.org/jira/browse/FLINK-34083
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


* Update ConfigConstants.java to deprecate and replace string configuration keys
 * Mark unused constants in ConfigConstants.java as deprecated



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-14 Thread via GitHub


reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1450161095


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
 }
 
 @Override
-public void notifyDataAvailable() {
+public void notifyDataAvailable(ResultSubpartitionView view) {
 requestQueue.notifyReaderNonEmpty(this);
 }
 
 @Override
 public void notifyPriorityEvent(int prioritySequenceNumber) {
-notifyDataAvailable();
+notifyDataAvailable(this.subpartitionView);
+}
+
+@VisibleForTesting
+public void notifyDataAvailable() {
+notifyDataAvailable(subpartitionView);

Review Comment:
   `subpartitionView ` -> `this.subpartitionView ` to consistent with 
`notifyPriorityEvent`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java:
##
@@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) 
{}
  * Get the availability and backlog of the view. The availability 
represents if the view is
  * ready to get buffer from it. The backlog represents the number of 
available data buffers.
  *
- * @param numCreditsAvailable the available credits for this {@link 
ResultSubpartitionView}.
+ * @param isCreditAvailable the availability of credits for this {@link 
ResultSubpartitionView}.
  * @return availability and backlog.
  */
-AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
+AvailabilityWithBacklog getAvailabilityAndBacklog(boolean 
isCreditAvailable);

Review Comment:
   Is this commit a must-have change, or just a refactor? 



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IndexRange;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+
+/** A collection of subpartition indexes. */
+public class ResultSubpartitionIndexSet extends IndexRange {

Review Comment:
   I wonder what is the difference between `IndexRange` and `IndexSet`. From my 
side, it's better to align with the name of the parent class.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##
@@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException {
 }
 
 @Override
-public void notifyDataAvailable() {
+public void notifyDataAvailable(ResultSubpartitionView view) {
 requestQueue.notifyReaderNonEmpty(this);
 }
 
 @Override
 public void notifyPriorityEvent(int prioritySequenceNumber) {
-notifyDataAvailable();
+notifyDataAvailable(this.subpartitionView);
+}
+
+@VisibleForTesting

Review Comment:
   Why this method is `@VisibleForTesting`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34080][configuration] Simplify the Configuration [flink]

2024-01-14 Thread via GitHub


flinkbot commented on PR #24088:
URL: https://github.com/apache/flink/pull/24088#issuecomment-1891392365

   
   ## CI report:
   
   * 39fc4c7369deceabffb7bc2496300e8a8061faba UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34080][configuration] Simplify the Configuration [flink]

2024-01-14 Thread via GitHub


1996fanrui opened a new pull request, #24088:
URL: https://github.com/apache/flink/pull/24088

   ## What is the purpose of the change
   
   See the part 2.2 of [FLIP-405](https://cwiki.apache.org/confluence/x/6Yr5E)
   
   
   ## Brief change log
   
   - [FLINK-34080][configuration] Add the `T get(ConfigOption configOption, 
T overrideDefault)` for Configuration
   - [FLINK-34080][configuration] Deprecate all getXxx and setXxx methods for 
Configuration
   - [FLINK-34080][configuration] Mark `setBytes` and `getBytes` of 
Configuration as `@Internal`
   - [FLINK-34080][configuration] Remove the `@Deprecated` for 
`getString(String key, String defaultValue)` of Configuration
   
   
   ## Verifying this change
   
 - Added `ConfigurationTest#testGetWithOverrideDefault`
 - Improved the `DelegatingConfigurationTest#testGetWithOverrideDefault`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no 
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34080) Simplify the Configuration

2024-01-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34080:
---
Labels: pull-request-available  (was: )

> Simplify the Configuration
> --
>
> Key: FLINK-34080
> URL: https://issues.apache.org/jira/browse/FLINK-34080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> This Jira is 2.2 part of FLIP-405:
>  * 2.2.1 Update Configuration to encourage the usage of ConfigOption over 
> string configuration key
>  * 2.2.2 Introduce public  T get(ConfigOption configOption, T 
> overrideDefault)
>  * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34077) Python Sphinx version error

2024-01-14 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang updated FLINK-34077:
-
Issue Type: Technical Debt  (was: Bug)

> Python Sphinx version error
> ---
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Assignee: Xingbo Huang
>Priority: Major
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34077) Python Sphinx version error

2024-01-14 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang updated FLINK-34077:
-
Summary: Python Sphinx version error  (was: Sphinx version needs upgrade)

> Python Sphinx version error
> ---
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Assignee: Xingbo Huang
>Priority: Major
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang resolved FLINK-34077.
--
Resolution: Fixed

Merged into master via d2fbe464b1a353a7eb35926299d5c048647a3073

> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Assignee: Xingbo Huang
>Priority: Major
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]

2024-01-14 Thread via GitHub


HuangXingBo merged PR #24086:
URL: https://github.com/apache/flink/pull/24086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806641#comment-17806641
 ] 

Xintong Song commented on FLINK-33728:
--

Sure, thanks for volunteering working on this. I've assigned you to the ticket. 
Please go ahead.

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-33728:


Assignee: xiaogang zhou

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]

2024-01-14 Thread via GitHub


HuangXingBo commented on PR #24086:
URL: https://github.com/apache/flink/pull/24086#issuecomment-1891366044

   
https://dev.azure.com/hxbks2ks/FLINK-TEST/_build/results?buildId=2214=logs=fba17979-6d2e-591d-72f1-97cf42797c11
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34080) Simplify the Configuration

2024-01-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-34080:

Priority: Blocker  (was: Major)

> Simplify the Configuration
> --
>
> Key: FLINK-34080
> URL: https://issues.apache.org/jira/browse/FLINK-34080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Fix For: 1.19.0
>
>
> This Jira is 2.2 part of FLIP-405:
>  * 2.2.1 Update Configuration to encourage the usage of ConfigOption over 
> string configuration key
>  * 2.2.2 Introduce public  T get(ConfigOption configOption, T 
> overrideDefault)
>  * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34082) Remove deprecated methods of Configuration in 2.0

2024-01-14 Thread Rui Fan (Jira)
Rui Fan created FLINK-34082:
---

 Summary: Remove deprecated methods of Configuration in 2.0
 Key: FLINK-34082
 URL: https://issues.apache.org/jira/browse/FLINK-34082
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34081) Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value)

2024-01-14 Thread Rui Fan (Jira)
Rui Fan created FLINK-34081:
---

 Summary: Refactor all callers that using the public Xxx 
getXxx(ConfigOption configOption)  and public void 
setXxx(ConfigOption key, Xxx value) 
 Key: FLINK-34081
 URL: https://issues.apache.org/jira/browse/FLINK-34081
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34080) Simplify the Configuration

2024-01-14 Thread Rui Fan (Jira)
Rui Fan created FLINK-34080:
---

 Summary: Simplify the Configuration
 Key: FLINK-34080
 URL: https://issues.apache.org/jira/browse/FLINK-34080
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.19.0


This Jira is 2.2 part of FLIP-405:
 * 2.2.1 Update Configuration to encourage the usage of ConfigOption over 
string configuration key
 * 2.2.2 Introduce public  T get(ConfigOption configOption, T 
overrideDefault)
 * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806640#comment-17806640
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~xtsong] [~wangyang0918] Ok, glad to hear that. Would you please help assign 
the ticket to me?

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34079) FLIP-405: Migrate string configuration key to ConfigOption

2024-01-14 Thread Rui Fan (Jira)
Rui Fan created FLINK-34079:
---

 Summary: FLIP-405: Migrate string configuration key to ConfigOption
 Key: FLINK-34079
 URL: https://issues.apache.org/jira/browse/FLINK-34079
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Rui Fan
Assignee: Xuannan Su
 Fix For: 1.19.0


This is an umbrella Jira of 
[FLIP-405|https://cwiki.apache.org/confluence/x/6Yr5E]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package

2024-01-14 Thread Jinzhong Li (Jira)
Jinzhong Li created FLINK-34078:
---

 Summary: Move InternalKeyContext classes from 
o.a.f.runtime.state.heap to o.a.f.runtime.state package
 Key: FLINK-34078
 URL: https://issues.apache.org/jira/browse/FLINK-34078
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Jinzhong Li
 Attachments: image-2024-01-15-12-57-12-667.png

h3. Motication:

When Rocksdb statebackend throws a keyGroup check illegal exception, 
the exception stack contains the heap stateBackend scoped class, which looks so 
strange to user.

!image-2024-01-15-12-57-12-667.png|width=555,height=68!
h3. Proposed changes:

InternalKeyContext and InternalKeyContextImpl are commonly used by all state 
backends (heap/rocksdb/changelog), they should be moved from 
org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state 
package.
h3. Compatibility:

InternalKeyContext is annotated with @Internal, so this change has no 
compatibility issues.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33576][core] Introduce new Flink conf file 'config.yaml' supporting standard YAML syntax. [flink]

2024-01-14 Thread via GitHub


JunRuiLee commented on PR #23852:
URL: https://github.com/apache/flink/pull/23852#issuecomment-1891254763

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32978) Deprecate RichFunction#open(Configuration parameters)

2024-01-14 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-32978.

Resolution: Fixed

Breaking changes reverted.
master (1.19): 1d6150f386d9c9ec61f4ab30853b915de7712047

> Deprecate RichFunction#open(Configuration parameters)
> -
>
> Key: FLINK-32978
> URL: https://issues.apache.org/jira/browse/FLINK-32978
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The 
> [FLIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231]
>  has decided that the parameter in RichFunction#open will be removed in the 
> next major version. We should deprecate it now and remove it in Flink 2.0. 
> The removal will be tracked in 
> [FLINK-6912|https://issues.apache.org/jira/browse/FLINK-6912].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]

2024-01-14 Thread via GitHub


xintongsong closed pull request #24067: [hotfix] Revert the breaking change to 
the public implementations of RichFunction
URL: https://github.com/apache/flink/pull/24067


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-14 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806608#comment-17806608
 ] 

Junrui Li commented on FLINK-27756:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56300=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.19.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-14 Thread via GitHub


JunRuiLee commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1451894290


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -792,7 +792,11 @@ public CompletableFuture 
disposeSavepoint(String savepointPath, Tim
 
 try {
 Checkpoints.disposeSavepoint(
-savepointPath, configuration, classLoader, 
log);
+savepointPath,
+new Configuration(),

Review Comment:
   Because disposeSavepoint is a cluster-wide operation, so it does not require 
job-specific configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-14 Thread via GitHub


JunRuiLee commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1451892446


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##
@@ -82,7 +83,8 @@ public void setUp() {
 @Test
 public void testImmediateCheckpointing() throws Exception {
 env.setRestartStrategy(RestartStrategies.noRestart());
-env.enableCheckpointing(Long.MAX_VALUE - 1);
+env.enableCheckpointing(
+Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK 
*/).toMillis());

Review Comment:
   1. The previous use of Long.MAX_VALUE - 1 as the checkpoint interval was 
intentional because using Long.MAX_VALUE itself would effectively disable 
checkpointing, which isn't the desired behavior. The intention was to enable 
checkpointing with an interval so large that it effectively prevents any 
checkpoint from being triggered during the test run. The use of 
Duration.ofNanos(Long.MAX_VALUE).toMillis() aligns well with this scenario.
   2. I will review all instances where a large checkpoint interval is used and 
add comments to clarify the reason using these intervals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806605#comment-17806605
 ] 

Jiabao Sun commented on FLINK-34076:


Hi [~khanhvu], do you add dependencies with "provided" scope to classpath?
I can run correctly locally.


{code:xml}


org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided


org.apache.flink
flink-streaming-java
${flink.version}
provided


org.apache.flink
flink-table-api-java-bridge
${flink.version}
provided


org.apache.flink
flink-table-planner-loader
${flink.version}
provided


org.apache.flink
flink-table-runtime
${flink.version}
provided


{code}


> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> and a minimal job definition:
> {code:java}
>   public static void main(String[] args) throws Exception {
>   // create data stream environment
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>   Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>   TableDescriptor descriptor =
>   TableDescriptor.forConnector("kinesis")
>   .schema(a)
>   .format("json")
>   .build();
>   tEnv.createTemporaryTable("sinkTable", descriptor);
>   tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>   }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>   at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>   at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>   ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>   
>   org.apache.flink
>   flink-connector-kinesis
>   ${flink.connector.kinesis.version}
>   
>   
>   org.apache.flink
>   flink-connector-base
>   ${flink.version}
>   provided
>   
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33792) Generate the same code for the same logic

2024-01-14 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li resolved FLINK-33792.

Fix Version/s: 1.19.0
   Resolution: Fixed

Implemented via d26c1b668b7febc60aab1e4174f568958cd615d3 (1.19.0)

[~zoudan] Thanks for your work! And also thanks [~lsy] for the review!

> Generate the same code for the same logic
> -
>
> Key: FLINK-33792
> URL: https://issues.apache.org/jira/browse/FLINK-33792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Dan Zou
>Assignee: Dan Zou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Generate the same code for the same logic, so that we may reuse the generated 
> code between different jobs. This is the precondition for FLINK-28691. The 
> current issue is we use a self-incrementing counter in CodeGenUtils#newName, 
> it means we could not get the same generated class between two queries even 
> when they are exactly the same.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]

2024-01-14 Thread via GitHub


flinkbot commented on PR #24087:
URL: https://github.com/apache/flink/pull/24087#issuecomment-1891229367

   
   ## CI report:
   
   * e81e5b792e05f208b281274fde872a8ca0035253 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-14 Thread via GitHub


libenchao closed pull request #23984: [FLINK-33792] Generate the same code for 
the same logic
URL: https://github.com/apache/flink/pull/23984


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Jiabao Sun (Jira)


[ https://issues.apache.org/jira/browse/FLINK-34076 ]


Jiabao Sun deleted comment on FLINK-34076:


was (Author: jiabao.sun):
Hi [~khanhvu], do you add dependencies with "provided" scope to classpath?

 !screenshot-1.png! 

> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> and a minimal job definition:
> {code:java}
>   public static void main(String[] args) throws Exception {
>   // create data stream environment
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>   Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>   TableDescriptor descriptor =
>   TableDescriptor.forConnector("kinesis")
>   .schema(a)
>   .format("json")
>   .build();
>   tEnv.createTemporaryTable("sinkTable", descriptor);
>   tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>   }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>   at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>   at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>   ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>   
>   org.apache.flink
>   flink-connector-kinesis
>   ${flink.connector.kinesis.version}
>   
>   
>   org.apache.flink
>   flink-connector-base
>   ${flink.version}
>   provided
>   
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-34076:
---
Attachment: screenshot-1.png

> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> and a minimal job definition:
> {code:java}
>   public static void main(String[] args) throws Exception {
>   // create data stream environment
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>   Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>   TableDescriptor descriptor =
>   TableDescriptor.forConnector("kinesis")
>   .schema(a)
>   .format("json")
>   .build();
>   tEnv.createTemporaryTable("sinkTable", descriptor);
>   tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>   }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>   at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>   at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>   ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>   
>   org.apache.flink
>   flink-connector-kinesis
>   ${flink.connector.kinesis.version}
>   
>   
>   org.apache.flink
>   flink-connector-base
>   ${flink.version}
>   provided
>   
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806603#comment-17806603
 ] 

Jiabao Sun commented on FLINK-34076:


Hi [~khanhvu], do you add dependencies with "provided" scope to classpath?

 !screenshot-1.png! 

> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
> Attachments: screenshot-1.png
>
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> and a minimal job definition:
> {code:java}
>   public static void main(String[] args) throws Exception {
>   // create data stream environment
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>   Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>   TableDescriptor descriptor =
>   TableDescriptor.forConnector("kinesis")
>   .schema(a)
>   .format("json")
>   .build();
>   tEnv.createTemporaryTable("sinkTable", descriptor);
>   tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>   }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>   at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>   at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>   ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>   
>   org.apache.flink
>   flink-connector-kinesis
>   ${flink.connector.kinesis.version}
>   
>   
>   org.apache.flink
>   flink-connector-base
>   ${flink.version}
>   provided
>   
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]

2024-01-14 Thread via GitHub


SinBex opened a new pull request, #24087:
URL: https://github.com/apache/flink/pull/24087

   ## What is the purpose of the change
   
   Currently, for JobVertices without parallelism configured, the 
AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the 
volume of input data. Specifically, for Source vertices, it uses the value of 
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the 
fixed parallelism. If this is not set by the user, the default value of 1  is 
used as the source parallelism, which is actually a temporary implementation 
solution.
   
   We aim to support dynamic source parallelism inference for batch jobs
   ## Brief change log
   
 - *Lazily initialize the parallelism of the OperatorCoordinator.*
 - *Add the `DynamicParallelismInference` and `DynamicFilteringInfo` 
interfaces, and enable the `SourceCoordinator` to invoke corresponding methods 
for dynamic parallelism inference.*
 - *The `AdaptiveBatchScheduler` applies dynamic source parallelism 
inference, and to avoid blocking the main thread by calling external systems, 
we have transformed the scheduling process to be asynchronous.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added integration tests to verify the end-to-end logic of dynamic 
parallelism inference, see 
`AdaptiveBatchSchedulerITCase#testSchedulingWithDynamicSourceParallelismInference`
 for details.*
 - *Added unit tests for the newly added methods in classes such as 
`SourceCoordinator` and `AdaptiveBatchScheduler`.*
 - *Manually verified the feature on a Flink session cluster (1 JobManager, 
77 TaskManagers), including dynamic inference of parallelism, asynchronous 
scheduling, and execution exceptions in `DynamicParallelismInference`, all 
performing as expected.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs / JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode

2024-01-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806600#comment-17806600
 ] 

Yang Wang commented on FLINK-34007:
---

Maybe I did not make myself clear. I mean the old leader JM should try to 
remove the annotation of HA ConfigMap 
{{control-plane.alpha.kubernetes.io/leader}} when lost leadership. From the 
fabric8 K8s client impl[1], {{isLeader}} callback will be executed only when 
the holder identity changed.

 

[1]. 
[https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L232]

> Flink Job stuck in suspend state after losing leadership in HA Mode
> ---
>
> Key: FLINK-34007
> URL: https://issues.apache.org/jira/browse/FLINK-34007
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2
>Reporter: Zhenqiu Huang
>Priority: Major
> Attachments: Debug.log, job-manager.log
>
>
> The observation is that Job manager goes to suspend state with a failed 
> container not able to register itself to resource manager after timeout.
> JM Log, see attached
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange

2024-01-14 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806599#comment-17806599
 ] 

Jinzhong Li commented on FLINK-34050:
-

Thanks for your reply. [~mayuehappy]   [~masteryhx] 

IMO, it is unreasonable that redundant data can't be cleaned up for a long time 
after rescaling. Especially in scenarios where disk space is very tight, this 
behavior is a major drawback.

I agree with that deleteRange+deleteFilesInRanges could be a good default 
behaviors. 

As for the  performance check about deleteRange+deleteFilesInRanges vs 
deleteRange, i think the rescaling-state-benchmark should  satisfy this [1].

WDYT? 

 

[1] 
https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java

> Rocksdb state has space amplification after rescaling with DeleteRange
> --
>
> Key: FLINK-34050
> URL: https://issues.apache.org/jira/browse/FLINK-34050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Priority: Major
> Attachments: image-2024-01-10-21-23-48-134.png, 
> image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png
>
>
> FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will 
> cause space amplification in some case.
> We can reproduce this problem using wordCount job:
> 1) before rescaling, state operator in wordCount job has 2 parallelism and 
> 4G+ full checkpoint size;
> !image-2024-01-10-21-24-10-983.png|width=266,height=130!
> 2) then restart job with 4 parallelism (for state operator),  the full 
> checkpoint size of new job will be 8G+ ;
> 3) after many successful checkpoints, the full checkpoint size is still 8G+;
> !image-2024-01-10-21-28-24-312.png|width=454,height=111!
>  
> The root cause of this issue is that the deleted keyGroupRange does not 
> overlap with current DB keyGroupRange, so new data written into rocksdb after 
> rescaling almost never do LSM compaction with the deleted data (belonging to 
> other keyGroupRange.)
>  
> And the space amplification may affect Rocksdb read performance and disk 
> space usage after rescaling. It looks like a regression due to the 
> introduction of deleteRange for rescaling optimization.
>  
> To slove this problem, I think maybe we can invoke 
> Rocksdb.deleteFilesInRanges after deleteRange?
> {code:java}
> public static void clipDBWithKeyGroupRange() {
>   //...
>   List ranges = new ArrayList<>();
>   //...
>   deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
>   ranges.add(beginKeyGroupBytes);
>   ranges.add(endKeyGroupBytes);
>   //
>   for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
>  db.deleteFilesInRanges(columnFamilyHandle, ranges, false);
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]

2024-01-14 Thread via GitHub


SinBex closed pull request #24078: [FLINK-33768] Support dynamic source 
parallelism inference for batch jobs
URL: https://github.com/apache/flink/pull/24078


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34035) when flinksql with group by partition field, some errors occured in jobmanager.log

2024-01-14 Thread hansonhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hansonhe updated FLINK-34035:
-
Summary: when flinksql with group by partition field, some errors occured 
in jobmanager.log  (was: when flinksql with group by partition some errors 
field occured in jobmanager.log)

> when flinksql with group by partition field, some errors occured in 
> jobmanager.log
> --
>
> Key: FLINK-34035
> URL: https://issues.apache.org/jira/browse/FLINK-34035
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: hansonhe
>Priority: Major
>
> flink.version=1.17.1
> kyuubi.version=1.8.0
> hive.version=3.1.2
> when run some hive sql as followings:
> CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', 
> 'default-database' = 'test');
> (1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_}
> _+-+_
> _| EXPR$0 |_
> _+-+_
> _| 1317 |_
> _+-+_
> _It's OK. There is no errors anywhere._
> {_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' 
> group by dt;
> {+}--{+}--+
> |dt|EXPR$1|
> {+}--{+}--+
> |2024-01-02|1317|
> {+}--{+}--+
> It can get correct result. But when i check jobmanager.log,I found some 
> errors appeared repeatly as folowings.Sometimes the errors also appeared on 
> the client terminal. I don't known whether these error will affect task 
> runtime or not?. Can somebody help me to have a see?
> '''
> 2024-01-09 14:03:25,979 WARN 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> java.util.concurrent.ExecutionException: 
> org.apache.flink.util.FlinkException: Coordinator of operator 
> e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this 
> operator belongs to is not initialized. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> ~[?:1.8.0_191]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
> ~[?:1.8.0_191]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
>  [flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>  [flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>  [flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
>  [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1]
> at 
> org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155)
>  [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: 
> org.apache.flink.util.FlinkException: Coordinator of operator 
> e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this 
> operator belongs to is not initialized. at 
> org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918)
>  ~[flink-dist-1.17.1.jar:1.17.1]
> at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?]
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_191]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>  ~[?:?]
> at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>  ~[?:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>  ~[?:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>  ~[?:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>  ~[?:?]
> at 
> 

Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]

2024-01-14 Thread via GitHub


flinkbot commented on PR #24086:
URL: https://github.com/apache/flink/pull/24086#issuecomment-1891216727

   
   ## CI report:
   
   * 0e96556bd28f20a0e60008df3c9ceabd00598a3d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34077:
---
Labels: pull-request-available  (was: )

> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Assignee: Xingbo Huang
>Priority: Major
>  Labels: pull-request-available
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]

2024-01-14 Thread via GitHub


HuangXingBo opened a new pull request, #24086:
URL: https://github.com/apache/flink/pull/24086

   ## What is the purpose of the change
   
   *This pull request will limit some sphinxcontrib packages upper bounds*
   
   
   ## Brief change log
   
 - *Limits some sphinxcontrib packages upper bounds*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *current tests*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]

2024-01-14 Thread via GitHub


xuyangzhong commented on PR #24068:
URL: https://github.com/apache/flink/pull/24068#issuecomment-1891209495

   The CI failure is caused by https://issues.apache.org/jira/browse/FLINK-34077


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent

2024-01-14 Thread Xiangyan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806596#comment-17806596
 ] 

Xiangyan commented on FLINK-33998:
--

I can't reproduce it in 1.14.6. May I know where can I find the change history 
to confirm the fix? Thanks!

> Flink Job Manager restarted after kube-apiserver connection intermittent
> 
>
> Key: FLINK-33998
> URL: https://issues.apache.org/jira/browse/FLINK-33998
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.6
> Environment: Kubernetes 1.24
> Flink Operator 1.4
> Flink 1.13.6
>Reporter: Xiangyan
>Priority: Major
> Attachments: audit-log-no-restart.txt, audit-log-restart.txt, 
> connection timeout.png, jm-no-restart4.log, jm-restart4.log
>
>
> We are running Flink on AWS EKS and experienced Job Manager restart issue 
> when EKS control plane scaled up/in.
> I can reproduce this issue in my local environment too.
> Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster 
> by my own with below setup:
>  * Two kube-apiserver, only one is running at a time;
>  * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13);
>  * Enable Flink Job Manager HA;
>  * Configure Job Manager leader election timeout;
> {code:java}
> high-availability.kubernetes.leader-election.lease-duration: "60s"
> high-availability.kubernetes.leader-election.renew-deadline: "60s"{code}
> For testing, I switch the running kube-apiserver from one instance to another 
> each time. When the kube-apiserver is switching, I can see that some Job 
> Managers restart, but some are still running normally.
> Here is an example. When kube-apiserver swatched over at 
> 05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to 
> kube-apiserver. But there is no more connection error within a few seconds. I 
> guess the connection recovered by retry.
> However, one of the JM (the 2nd one in the attached screen shot) reported 
> "DefaultDispatcherRunner was revoked the leadership" error after the leader 
> election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then restarted 
> itself. While the other JM was still running normally.
> From kube-apiserver audit logs, the normal JM was able to renew leader lease 
> after the interruption. But there is no any lease renew request from the 
> failed JM until it restarted.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806595#comment-17806595
 ] 

Xingbo Huang commented on FLINK-34077:
--

Some sphinxcontrib(sphinxcontrib-applehelp, sphinxcontrib.devhelp 
sphinxcontrib.htmlhelp and so on) packages have released new versions, but they 
have not done compatibility, so the document build fails. I will hofix to limit 
the versions of these packages. Regarding upgrading the sphix version, some 
current conf configurations need to be changed, which are incompatible with the 
current conf. I think it can be done as a new feature.

> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-34077:


Assignee: Xingbo Huang

> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Assignee: Xingbo Huang
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33057) Add options to disable creating job-id subdirectories under the checkpoint directory

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-33057.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged 73b036c3 into master

> Add options to disable creating job-id subdirectories under the checkpoint 
> directory
> 
>
> Key: FLINK-33057
> URL: https://issues.apache.org/jira/browse/FLINK-33057
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> By default, Flink creates subdirectories named by UUID (job id) under 
> checkpoint directory for each job. It's a good means to avoid collision. 
> However, it also bring in some effort to remember/find the right directory 
> when recovering from previous checkpoint. According to previous discussion 
> ([Yun 
> Tang's|https://issues.apache.org/jira/browse/FLINK-11789?focusedCommentId=16782314=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16782314]
>  and [Stephan 
> Ewen's|https://issues.apache.org/jira/browse/FLINK-9043?focusedCommentId=16409254=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16409254]
>  ), I think it would be useful to add an option to disable creating the UUID 
> subdirectories under the checkpoint directory. For compatibility 
> considerations, we create the subdirectories by default.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806592#comment-17806592
 ] 

Yang Wang commented on FLINK-33728:
---

Not only the {{KubernetesResourceManagerDriver}} will create a new watch when 
received the {{{}TooOldResourceVersion{}}}, but also the fabric8 K8s client has 
the similar logic in {{{}Reflector.java{}}}[1], which we are using for the 
Flink Kubernetes HA implementation.

 

In my opinion, the K8s APIServer should have the ability to protect itself by 
using the flow control[2]. Then it will reject some requests if it could not 
process too many requests. Flink will then retry to create a new watch when the 
previous one failed. What Flink could do more is using a 
{{ExponentialBackoffDelayRetryStrategy}} to replace current continuous retry 
strategy.

 

[1]. 
[https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java#L288]

[2]. [https://kubernetes.io/docs/concepts/cluster-administration/flow-control/]

 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33057][Checkpointing] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2024-01-14 Thread via GitHub


masteryhx closed pull request #23509: [FLINK-33057][Checkpointing] Add options 
to disable creating job-id subdirectories under the checkpoint directory
URL: https://github.com/apache/flink/pull/23509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33879) Hybrid Shuffle may stop working for a while during redistribution

2024-01-14 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33879.
--
Resolution: Fixed

master(1.19) via 879509d7ca886f8f0ed4dd966e859d3c2a5aa231.

> Hybrid Shuffle may stop working for a while during redistribution
> -
>
> Key: FLINK-33879
> URL: https://issues.apache.org/jira/browse/FLINK-33879
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the Hybrid Shuffle can work with the memory tier and disk tier 
> together, however, in the following scenario the result partition would stop 
> working.
> Suppose we have a shuffle task with 2 sub-partitions. The LocalBufferPool has 
> 15 buffers, the memory tier can use at most 15-(2*(2+1)+1) = 8 buffers 
> according to `TieredStorageMemoryManagerImpl#getMaxNonReclaimableBuffers`. If 
> the memory tier uses up all 8 buffers and the input channel consumes them 
> very slowly because of problems, e.g. unstable network, the disk tier can 
> still work with 1 reserved buffer. However, if a redistribution happens now 
> and the pool size is decreased to less than 8, then the BufferAccumulator can 
> not request buffers anymore, and thus the result partition stops working 
> until the buffers in the memory tier are recycled.
> The purpose is to make the result partition still work with the disk tier and 
> write the shuffle data to disk so that once the input channel is ready, the 
> data on the disk can be consumed immediately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-14 Thread via GitHub


reswqa merged PR #23957:
URL: https://github.com/apache/flink/pull/23957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-30613) Improve resolving schema compatibility -- Milestone one

2024-01-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-30613.
--
Resolution: Fixed

merged 13921a08...0e5de813 into master

> Improve resolving schema compatibility -- Milestone one
> ---
>
> Key: FLINK-30613
> URL: https://issues.apache.org/jira/browse/FLINK-30613
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> In the milestone one, we should:
>  # Add an extra method 
> (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot 
> oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return 
> INCOMPATIBLE as default.
>  # Mark the original method as deprecated and it will use new method to 
> resolve as default.
>  # Implement the new method for all built-in TypeserializerSnapshots.
> See FLIP-263 for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-14 Thread via GitHub


masteryhx closed pull request #21635: [FLINK-30613] Improve resolving schema 
compatibility -- Milestone one
URL: https://github.com/apache/flink/pull/21635


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806586#comment-17806586
 ] 

Xintong Song commented on FLINK-33728:
--

Sorry for the late reply, I was distracted by some other works last week.

I think you are right about that JM will kill itself if the re-watch does not 
succeed. I think it is expected in most cases that the client try re-watch 
immediately after seeing a ResourceVersionTooOld exception. However, if the 
first attempt to re-watch fail, JM should not kill itself immediately, but may 
retry with some backoff interval.

cc [~wangyang0918]

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806581#comment-17806581
 ] 

xuyang commented on FLINK-34077:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56351=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901

> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Yunfeng Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunfeng Zhou updated FLINK-34077:
-
Description: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
 
{code:java}
Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees -a -W . _build/html
Jan 14 15:49:17 Running Sphinx v4.5.0
Jan 14 15:49:17
Jan 14 15:49:17 Sphinx version error:
Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
needs at least Sphinx v5.0; it therefore cannot be built with this version.
Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
Jan 14 15:49:17 make: *** [html] Error 2
Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
 
 

  was:
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
 
{code:java}
 Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0
Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The 
sphinxcontrib.applehelp extension used by this project needs at least Sphinx 
v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 
Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** 
[html] Error 2Jan 14 15:49:18 ==sphinx checks... 
[FAILED]=== {code}
 
 


> Sphinx version needs upgrade
> 
>
> Key: FLINK-34077
> URL: https://issues.apache.org/jira/browse/FLINK-34077
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Yunfeng Zhou
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
>  
> {code:java}
> Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
> _build/doctrees -a -W . _build/html
> Jan 14 15:49:17 Running Sphinx v4.5.0
> Jan 14 15:49:17
> Jan 14 15:49:17 Sphinx version error:
> Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project 
> needs at least Sphinx v5.0; it therefore cannot be built with this version.   
>  
> Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed
> Jan 14 15:49:17 make: *** [html] Error 2
> Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34077) Sphinx version needs upgrade

2024-01-14 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-34077:


 Summary: Sphinx version needs upgrade
 Key: FLINK-34077
 URL: https://issues.apache.org/jira/browse/FLINK-34077
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Yunfeng Zhou


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901]
 
{code:java}
 Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0
Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The 
sphinxcontrib.applehelp extension used by this project needs at least Sphinx 
v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 
Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** 
[html] Error 2Jan 14 15:49:18 ==sphinx checks... 
[FAILED]=== {code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost

2024-01-14 Thread Yang LI (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806520#comment-17806520
 ] 

Yang LI commented on FLINK-34063:
-

Ok [~dmvk] , I'll do it

> When snapshot compression is enabled, rescaling of a source operator leads to 
> some splits getting lost
> --
>
> Key: FLINK-34063
> URL: https://issues.apache.org/jira/browse/FLINK-34063
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0, 1.18.1
> Environment: Can be reproduced in any environment. The most important 
> thing is to enable snapshot compression.
>Reporter: Ivan Burmistrov
>Assignee: David Morávek
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.18.2
>
> Attachments: image-2024-01-11-16-27-09-066.png, 
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a 
> pretty nasty bug. 
> The symptoms on our production system were as following. After a while after 
> deploying a job with autoscaler it started accumulating Kafka lag, and this 
> could only be observed via external lag measurement - from inside Flink 
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions 
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s 
> why from the Flink’s perspective everything was fine - the lag was growing on 
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions” 
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then 
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more 
> precisely, to the scaling of the source operator - with almost 100% 
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and 
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
>  * Launch a job with source operator parallelism = 4, enable DEBUG logging
>  * Wait until it takes the first checkpoint
>  * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
> can be done via Flink UI)
>  * Wait until the new checkpoint is taken
>  * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get 
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these 
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
>  state for 4 split(s) to reader.","service_name":"data-beaver"} 
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
>  split(s) to reader: 
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits 
> have duplicates - we see that in reality 2 unique partitions have been added 
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started 
> looking suspicious:
>  
> {code:java}
> {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
>  "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
> SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
> [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], distributionMode=SPLIT_DISTRIBUTE}}, 
> 

Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-14 Thread via GitHub


zhuzhurk commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1451774802


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##
@@ -140,7 +140,7 @@ public StreamGraph(
 ExecutionConfig executionConfig,
 CheckpointConfig checkpointConfig,
 SavepointRestoreSettings savepointRestoreSettings) {
-this.jobConfiguration = checkNotNull(jobConfiguration);
+this.jobConfiguration = new Configuration(jobConfiguration);

Review Comment:
   Maybe `new Configuration(checkNotNull(jobConfiguration));`?



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java:
##
@@ -446,16 +446,20 @@ void testBufferTimeoutEnabled() {
 void testBufferTimeoutDisabled() {
 Configuration config = new Configuration();
 config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false);
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);

Review Comment:
   Not sure why passing the config in here? It will be used in 
`env.configure()` right after.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-14 Thread via GitHub


zhuzhurk commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1451772433


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##
@@ -82,7 +83,8 @@ public void setUp() {
 @Test
 public void testImmediateCheckpointing() throws Exception {
 env.setRestartStrategy(RestartStrategies.noRestart());
-env.enableCheckpointing(Long.MAX_VALUE - 1);
+env.enableCheckpointing(
+Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK 
*/).toMillis());

Review Comment:
   I see. What confused me is that it changes `Long.MAX_VALUE - 1` to 
`Long.MAX_VALUE`. I think these two numbers do not make actual differences 
though.
   Besides that, I can see other usages of `enableCheckpointing(...)` which 
passes in a large number, e.g. 
`CheckpointAfterAllTasksFinishedITCase#testRestoreAfterSomeTasksFinished()` and 
`RestoreUpgradedJobITCase#runOriginalJob()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on PR #23470:
URL: https://github.com/apache/flink/pull/23470#issuecomment-189017

   Hi @JingGe @xuyangzhong thanks for your reviews. I updated the PR addressing 
your comments. I think the CI failure is not related to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451764950


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
 val tableConfig = planner.getTableConfig
 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+val miniBatchRequirementChecker = {
+  (node: RelNode) =>
+node.isInstanceOf[Filter] ||

Review Comment:
   Hi @xuyangzhong thanks for your comment. Good point. 
   There are two main reasons I think we should not move this logic elsewhere 
than `StreamCommonSubGraphBasedOptimizer`:
   
- There is a small difference in the logic when we move the logic to the 
`MiniBatchIntervalInferRule`. 
   In  `MiniBatchIntervalInferRule` the check for the `miniBatchEnabled` is 
calculated via global variables:
   
   ```
   val miniBatchEnabled = 
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
   ```
   
   Therefore, it is deterministic. If we calculate the mini-batch skipping 
logic in `MiniBatchIntervalInferRule`, the result will not be deterministic, 
because the optimizer will invoke `MiniBatchIntervalInferRule::onMatch` with 
different parts of the query plan (not necessarily only with the root of the 
query plan).
   
   - Also, there are many parts of the code that decide whether mini batch 
enabled via checking the global configuration variable. Therefore, it might be 
better to unset the global configuration inside 
`StreamCommonSubGraphBasedOptimizer` and set the original configuration 
variable at the end. I used `try...finally` for that. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451763272


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
 val tableConfig = planner.getTableConfig
 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+val miniBatchRequirementChecker = {
+  (node: RelNode) =>
+node.isInstanceOf[Filter] ||

Review Comment:
   Agreed, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451763251


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
 val tableConfig = planner.getTableConfig
 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+val miniBatchRequirementChecker = {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451763129


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
 val tableConfig = planner.getTableConfig
 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+val miniBatchRequirementChecker = {
+  (node: RelNode) =>
+node.isInstanceOf[Filter] ||
+node.isInstanceOf[Project] ||
+node.isInstanceOf[TableScan] ||
+(node.isInstanceOf[LogicalUnion] && 
node.asInstanceOf[LogicalUnion].all)
+}
 // infer trait properties for sink block
 sinkBlocks.foreach {
   sinkBlock =>
 // don't require update before by default
 sinkBlock.setUpdateBeforeRequired(false)
-
 val miniBatchInterval: MiniBatchInterval =
-  if 
(tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) {
+  if (
+
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) &&
+!sinkBlock.containsAll(miniBatchRequirementChecker)

Review Comment:
   Agreed. Also, I moved the mini-batch skipping logic from `RelnodeBlock` to 
`StreamCommonSubGraphBasedOptimizer:: shouldSkipMiniBatch`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-01-14 Thread via GitHub


jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451762837


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/MiniBatchOptimizationTest.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.analyze;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.delegation.StreamPlanner;
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlock;
+import 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer;
+import org.apache.flink.table.planner.plan.trait.MiniBatchIntervalTrait;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for enabling/disabling mini-batch assigner operator based on query 
plan. The optimization is
+ * performed in {@link StreamCommonSubGraphBasedOptimizer}.
+ */
+@RunWith(Parameterized.class)
+public class MiniBatchOptimizationTest extends TableTestBase {
+
+private final StreamTableTestUtil util = 
streamTestUtil(TableConfig.getDefault());
+private final StreamTableEnvironment streamTableEnv =
+StreamTableEnvironment.create(util.getStreamEnv());
+
+@Parameterized.Parameter public boolean isMiniBatchEnabled;
+
+@Parameterized.Parameter(1)
+public long miniBatchLatency;
+
+@Parameterized.Parameter(2)
+public long miniBatchSize;
+
+@Before
+public void before() {
+streamTableEnv
+.getConfig()
+.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
isMiniBatchEnabled)
+.set(
+
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+Duration.ofSeconds(miniBatchLatency))
+.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 
miniBatchSize);
+streamTableEnv.executeSql(
+"CREATE TABLE MyTableA (\n"
++ "  a BIGINT,\n"
++ "  b INT NOT NULL,\n"
++ "  c VARCHAR,\n"
++ "  d BIGINT\n"
++ ") WITH (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'false')");
+streamTableEnv.executeSql(
+"CREATE TABLE MyTableB (\n"
++ "  a BIGINT,\n"
++ "  b INT NOT NULL,\n"
++ "  c VARCHAR,\n"
++ "  d BIGINT\n"
++ ") WITH (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'false')");
+}
+
+private boolean containsMiniBatch(String sql) {
+final Table result = streamTableEnv.sqlQuery(sql);
+RelNode relNode = TableTestUtil.toRelNode(result);
+StreamPlanner planner =
+(StreamPlanner) ((TableEnvironmentImpl) 
streamTableEnv).getPlanner();
+StreamCommonSubGraphBasedOptimizer optimizer =
+new StreamCommonSubGraphBasedOptimizer(planner);
+Seq nodeSeq =
+
JavaConverters.asScalaIteratorConverter(Arrays.asList(relNode).iterator())
+.asScala()
+.toSeq();
+Seq blockSeq = optimizer.doOptimize(nodeSeq);
+List blockList = 

[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

and a minimal job definition:

{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);

tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
}
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the kinesis connector, the current separation adds unnecessary hassle to use 
the connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

and a minimal job definition:

{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.option("stream", "abc")
.option("aws.region", 
"eu-central-1")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);

tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
}
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}

The fix is to explicitly 

[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

and a minimal job definition:

{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.option("stream", "abc")
.option("aws.region", 
"eu-central-1")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);

tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
}
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the kinesis connector, the current separation adds unnecessary hassle to use 
the connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' 
can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.


> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / 

Re: [PR] [FLINK-33741]Expose Rocksdb Histogram statistics in Flink metrics [flink]

2024-01-14 Thread via GitHub


flinkbot commented on PR #24050:
URL: https://github.com/apache/flink/pull/24050#issuecomment-1890987057

   
   ## CI report:
   
   * 64ca2d0a5711ffa6a5121db073fbeceaba9178b6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' 
can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' 
can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.


> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Connector 
> 'kinesis' can only be used as a source. It cannot be used as a sink.
> at 
> org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
> at 
> org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
> at 
> 

[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

E.g. with following depenency only in pom.xml
{code:java}
        
            org.apache.flink
            flink-connector-kinesis
            ${flink.connector.kinesis.version}
        
{code}

following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' 
can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}

org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided

{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

{{E.g. with following depenency only in pom.xml}}
{{```}}
Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.}}
{{at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)}}
{{at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)}}
{{at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)}}
{{... 20 more
{{```}}

following exception will be thrown:
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

Workaround is to explicitly specify `flink-connector-base` as dependency of the 
project:

```
{{
org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided
}}
```

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.


> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in execution.
> E.g. with following depenency only in pom.xml
> {code:java}
>         
>             org.apache.flink
>             flink-connector-kinesis
>             ${flink.connector.kinesis.version}
>         
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Connector 
> 'kinesis' can only be used as a source. It cannot be used as a sink.
> at 
> org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
> at 
> 

[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-
Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

{{E.g. with following depenency only in pom.xml}}
{{```}}
Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.}}
{{at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)}}
{{at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)}}
{{at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)}}
{{... 20 more
{{```}}

following exception will be thrown:
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

Workaround is to explicitly specify `flink-connector-base` as dependency of the 
project:

```
{{
org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided
}}
```

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

E.g. with following depenency only in pom.xml
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

following exception will be thrown:
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

Workaround is to explicitly specify `flink-connector-base` as dependency of the 
project:

```
{{
org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided
}}
```

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.


> flink-connector-base missing fails kinesis table sink to create
> ---
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Khanh Vu
>Priority: Major
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in execution.
> {{E.g. with following depenency only in pom.xml}}
> {{```}}
> Caused by: org.apache.flink.table.api.ValidationException: Connector 
> 'kinesis' can only be used as a source. It cannot be used as a sink.}}
> {{at 
> 

[jira] [Created] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create

2024-01-14 Thread Khanh Vu (Jira)
Khanh Vu created FLINK-34076:


 Summary: flink-connector-base missing fails kinesis table sink to 
create
 Key: FLINK-34076
 URL: https://issues.apache.org/jira/browse/FLINK-34076
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: aws-connector-4.2.0
Reporter: Khanh Vu


The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in execution.

E.g. with following depenency only in pom.xml
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

following exception will be thrown:
```
{{Caused by: org.apache.flink.table.api.ValidationException: Connector 
'kinesis' can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 20 more}}
```

Workaround is to explicitly specify `flink-connector-base` as dependency of the 
project:

```
{{
org.apache.flink
flink-connector-kinesis
${flink.connector.kinesis.version}


org.apache.flink
flink-connector-base
${flink.version}
provided
}}
```

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33741]Expose Rocksdb Histogram statistics in Flink metrics [flink]

2024-01-14 Thread via GitHub


Myasuka commented on code in PR #24050:
URL: https://github.com/apache/flink/pull/24050#discussion_r1451751194


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java:
##
@@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements 
Serializable {
 .withDescription(
 "Monitor the duration of writer requiring to wait 
for compaction or flush to finish in RocksDB.");
 
+public static final ConfigOption MONITOR_DB_GET =
+ConfigOptions.key("state.backend.rocksdb.metrics.db_get")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Monitor the metric that measures the time taken 
for a RocksDB database to perform a read operation in microseconds");
+
+public static final ConfigOption MONITOR_DB_WRITE =
+ConfigOptions.key("state.backend.rocksdb.metrics.db_write")

Review Comment:
   Same here, I think `state.backend.rocksdb.metrics.db-write` looks better.



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##
@@ -231,4 +252,88 @@ public void update() {
 setStatistics(this);
 }
 }
+
+/**
+ * A gauge which periodically pulls a RocksDB statistics-based native 
statistic metric for the database.

Review Comment:
   It's not a gauge here.



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##
@@ -231,4 +252,88 @@ public void update() {
 setStatistics(this);
 }
 }
+
+/**
+ * A gauge which periodically pulls a RocksDB statistics-based native 
statistic metric for the database.
+ */
+class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView 
implements Histogram {
+private final HistogramType histogramType;
+
+private HistogramData histogramData;
+
+private RocksDBNativeHistogramStatisticsMetricView(HistogramType 
histogramType) {
+this.histogramType = histogramType;
+}
+
+void setValue(HistogramData histogramData) {
+this.histogramData = histogramData;
+}
+
+@Override
+public void update() {
+setHistogramStatistics(this);
+}
+
+@Override
+public void update(long value) {
+}
+
+@Override
+public long getCount() {
+return histogramData == null ? 0 : histogramData.getCount();
+}
+
+@Override
+public HistogramStatistics getStatistics() {
+return new RocksDBNativeHistogram();
+}
+
+class RocksDBNativeHistogram extends HistogramStatistics {
+
+@Override
+public double getQuantile(double quantile) {
+if (histogramData == null) {
+return 0;
+} else if (quantile == 0.5) {
+return histogramData.getMedian();
+} else if (quantile == 0.95) {
+return histogramData.getPercentile95();
+} else if (quantile == 0.99) {
+return histogramData.getPercentile99();
+} else {
+return 0;
+}
+}
+
+@Override
+public long[] getValues() {
+return new long[0];
+}
+
+@Override
+public int size() {
+return 0;

Review Comment:
   Why not use `histogramData.getCount()`?



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java:
##
@@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements 
Serializable {
 .withDescription(
 "Monitor the duration of writer requiring to wait 
for compaction or flush to finish in RocksDB.");
 
+public static final ConfigOption MONITOR_DB_GET =
+ConfigOptions.key("state.backend.rocksdb.metrics.db_get")

Review Comment:
   Please follow the metrics options style in this class, use 
`state.backend.rocksdb.metrics.db-get`.



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java:
##
@@ -231,4 +252,88 @@ public void update() {
 setStatistics(this);
 }
 }
+
+/**
+ * A gauge which periodically pulls a RocksDB statistics-based native 
statistic metric for the database.
+ */
+class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView 
implements Histogram {
+private final HistogramType 

Re: [PR] [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh [flink]

2024-01-14 Thread via GitHub


flinkbot commented on PR #24085:
URL: https://github.com/apache/flink/pull/24085#issuecomment-1890975682

   
   ## CI report:
   
   * 6b6151ac82aa466850d93eefd7951e1fd65ca01b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34072) Use JAVA_RUN in shell scripts

2024-01-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34072:
---
Labels: pull-request-available  (was: )

> Use JAVA_RUN in shell scripts
> -
>
> Key: FLINK-34072
> URL: https://issues.apache.org/jira/browse/FLINK-34072
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts
>Reporter: Yun Tang
>Assignee: Yu Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, 
> otherwise we might be able to run the {{java}} if JAVA_HOME is not set.
> such as:
> {code:java}
> flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : 
> syntax error: operand expected (error token is "> 17 ")
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh [flink]

2024-01-14 Thread via GitHub


yuchen-ecnu opened a new pull request, #24085:
URL: https://github.com/apache/flink/pull/24085

   ## What is the purpose of the change
   
   Replace the `java` command with `JAVA_RUN` in scripts.
   
   
   ## Brief change log
   
   - Replace `java` with `JAVA_RUN` in config.sh
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057][Checkpointing] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2024-01-14 Thread via GitHub


Zakelly commented on PR #23509:
URL: https://github.com/apache/flink/pull/23509#issuecomment-1890974314

   > Thanks for the update. Current PR LGTM % the conflict. Could you rebase 
all commits into master to resolve the conflict ? BTW, please remember to 
rename the title with component tag.
   
   Already done, thanks @masteryhx 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]

2024-01-14 Thread via GitHub


Myasuka commented on code in PR #23509:
URL: https://github.com/apache/flink/pull/23509#discussion_r1451744345


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java:
##
@@ -72,6 +73,7 @@ public MemoryBackendCheckpointStorageAccess(
 JobID jobId,
 @Nullable Path checkpointsBaseDirectory,
 @Nullable Path defaultSavepointLocation,
+boolean createCheckpointSubDirs,

Review Comment:
   I have no strong against here. We can keep the current design.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34072) Use JAVA_RUN in shell scripts

2024-01-14 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806474#comment-17806474
 ] 

Yun Tang commented on FLINK-34072:
--

[~Yu Chen] Already assigned, please go ahead.

> Use JAVA_RUN in shell scripts
> -
>
> Key: FLINK-34072
> URL: https://issues.apache.org/jira/browse/FLINK-34072
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts
>Reporter: Yun Tang
>Assignee: Yu Chen
>Priority: Minor
> Fix For: 1.19.0
>
>
> We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, 
> otherwise we might be able to run the {{java}} if JAVA_HOME is not set.
> such as:
> {code:java}
> flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : 
> syntax error: operand expected (error token is "> 17 ")
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34072) Use JAVA_RUN in shell scripts

2024-01-14 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-34072:


Assignee: Yu Chen

> Use JAVA_RUN in shell scripts
> -
>
> Key: FLINK-34072
> URL: https://issues.apache.org/jira/browse/FLINK-34072
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts
>Reporter: Yun Tang
>Assignee: Yu Chen
>Priority: Minor
> Fix For: 1.19.0
>
>
> We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, 
> otherwise we might be able to run the {{java}} if JAVA_HOME is not set.
> such as:
> {code:java}
> flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : 
> syntax error: operand expected (error token is "> 17 ")
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33839) Deploy Python artifacts to PyPI (need PMC role)

2024-01-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-33839:
---

Assignee: (was: Jing Ge)

> Deploy Python artifacts to PyPI (need PMC role)
> ---
>
> Key: FLINK-33839
> URL: https://issues.apache.org/jira/browse/FLINK-33839
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jing Ge
>Priority: Major
>
> Release manager should create a PyPI account and ask the PMC add this account 
> to pyflink collaborator list with Maintainer role (The PyPI admin account 
> info can be found here. NOTE, only visible to PMC members) to deploy the 
> Python artifacts to PyPI. The artifacts could be uploaded using 
> twine([https://pypi.org/project/twine/]). To install twine, just run:
> {code:java}
> pip install --upgrade twine==1.12.0
> {code}
> Download the python artifacts from dist.apache.org and upload it to pypi.org:
> {code:java}
> svn checkout 
> https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
> cd flink-${RELEASE_VERSION}-rc${RC_NUM}
>  
> cd python
>  
> #uploads wheels
> for f in *.whl; do twine upload --repository-url 
> https://upload.pypi.org/legacy/ $f $f.asc; done
>  
> #upload source packages
> twine upload --repository-url https://upload.pypi.org/legacy/ 
> apache-flink-libraries-${RELEASE_VERSION}.tar.gz 
> apache-flink-libraries-${RELEASE_VERSION}.tar.gz.asc
>  
> twine upload --repository-url https://upload.pypi.org/legacy/ 
> apache-flink-${RELEASE_VERSION}.tar.gz 
> apache-flink-${RELEASE_VERSION}.tar.gz.asc
> {code}
> If upload failed or incorrect for some reason (e.g. network transmission 
> problem), you need to delete the uploaded release package of the same version 
> (if exists) and rename the artifact to 
> {{{}apache-flink-${RELEASE_VERSION}.post0.tar.gz{}}}, then re-upload.
> (!) Note: re-uploading to pypi.org must be avoided as much as possible 
> because it will cause some irreparable problems. If that happens, users 
> cannot install the apache-flink package by explicitly specifying the package 
> version, i.e. the following command "pip install 
> apache-flink==${RELEASE_VERSION}" will fail. Instead they have to run "pip 
> install apache-flink" or "pip install apache-flink==${RELEASE_VERSION}.post0" 
> to install the apache-flink package.
>  
> 
> h3. Expectations
>  * Python artifacts released and indexed in the 
> [PyPI|https://pypi.org/project/apache-flink/] Repository



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33840) Deploy artifacts to Maven Central Repository (need PMC role)

2024-01-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-33840:
---

Assignee: (was: Jing Ge)

> Deploy artifacts to Maven Central Repository (need PMC role)
> 
>
> Key: FLINK-33840
> URL: https://issues.apache.org/jira/browse/FLINK-33840
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jing Ge
>Priority: Major
>
> Use the [Apache Nexus repository|https://repository.apache.org/] to release 
> the staged binary artifacts to the Maven Central repository. In the Staging 
> Repositories section, find the relevant release candidate orgapacheflink-XXX 
> entry and click Release. Drop all other release candidates that are not being 
> released.
> h3. Deploy source and binary releases to dist.apache.org
> Copy the source and binary releases from the dev repository to the release 
> repository at [dist.apache.org|http://dist.apache.org/] using Subversion.
> {code:java}
> $ svn move -m "Release Flink ${RELEASE_VERSION}" 
> https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
>  https://dist.apache.org/repos/dist/release/flink/flink-${RELEASE_VERSION}
> {code}
> (Note: Only PMC members have access to the release repository. If you do not 
> have access, ask on the mailing list for assistance.)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >