[GitHub] [flink] KarmaGYZ commented on pull request #23415: [FLINK-33053][zookeeper] Manually remove the leader watcher after ret…

2023-09-14 Thread via GitHub


KarmaGYZ commented on PR #23415:
URL: https://github.com/apache/flink/pull/23415#issuecomment-1720713952

   @tisonkun yes, it's a valid issue, the watcher on RM connection info will be 
removed after JobMaster closed. I think we can fiter out the 
"resource_manager/connection_info".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xiangforever2014 commented on pull request #23253: [FLINK-32881][checkpoint] Support triggering savepoint in detach mode for CLI and dumping all pending savepoint ids by rest api

2023-09-14 Thread via GitHub


xiangforever2014 commented on PR #23253:
URL: https://github.com/apache/flink/pull/23253#issuecomment-1720713759

   @masteryhx 
   Many thanks for your comment~
   
   Here are my requirements:
   1. support triggering savepoint in detached mode, so the cli process can 
return quickly and will not occupy the resource for a long time;
   2. get the status of detach savepoint information through platform, that is 
whether the detach savepoint has been made successfully, or failed, or 
producing at the time.
   
   To your question:
   With non-detach savepoint, we could know whether this savepoint has been 
successfully triggered and made from the CLI output, but in detach mode, if we 
just change code in flink-clients, that is, just send the request and return, 
we could not know whether the savepoint triggered this time is in which status 
from the current REST API, at least we need to know the savepoint id, in this 
case, we could use api "/jobs/:jobid/checkpoints/details/:checkpointid" to get 
the savepoint status.
   
   But in this case, the client needs to wait for the savepoint id msg from jm, 
that makes the client needs to wait for a while.
   
   So in this pr, I support triggering savepoint in detach mode as following:
   1. client generates a savepoint-id, which is produced by RandomUUID, and 
when triggering savepoint in detach mode, this info will put into the request 
body;
   2. when jm receives the request, it will check the savepoint-id info, if 
this field is set, it will directly use this as the savepoint id instead of 
generating a id like savepoint-xx.
   
   In this way, client does not need to wait for any message, it triggers the 
detach savepoint and generates a savepoint id, then its job is finished. If we 
want to know the status of the detach savepoint, we could use the 
dump-pending-savepoint api and get all savepoint ids, then compare the 
savepoint id generated by client.
   
   Hope to get your suggestion, thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #23417: [FLINK-33030][python]Add python 3.11 support

2023-09-14 Thread via GitHub


gaborgsomogyi commented on PR #23417:
URL: https://github.com/apache/flink/pull/23417#issuecomment-1720710015

   Just added wheel building debug code to the PR which must be reverted back 
when Azure passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-14 Thread Yi Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765458#comment-17765458
 ] 

Yi Zhang commented on FLINK-33090:
--

Yes I would like to contribute, I will send a PR for review.

> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Priority: Major
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33029) Drop python 3.7 support

2023-09-14 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765457#comment-17765457
 ] 

Gabor Somogyi commented on FLINK-33029:
---

I've just double checked and wheel building has been fixed in the next nightly: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53251=results

> Drop python 3.7 support
> ---
>
> Key: FLINK-33029
> URL: https://issues.apache.org/jira/browse/FLINK-33029
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #23419: [BP][FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent for tiered storage

2023-09-14 Thread via GitHub


TanYuxin-tyx commented on code in PR #23419:
URL: https://github.com/apache/flink/pull/23419#discussion_r1326779772


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileReader;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RemoteTierConsumerAgent}. */
+class RemoteTierConsumerAgentTest {
+
+@TempDir private File tempFolder;
+
+private String remoteStoragePath;
+
+@BeforeEach
+void before() {
+remoteStoragePath = Path.fromLocalFile(tempFolder).getPath();
+}
+
+@Test
+void testGetEmptyBuffer() {
+RemoteTierConsumerAgent remoteTierConsumerAgent =
+new RemoteTierConsumerAgent(
+new RemoteStorageScanner(remoteStoragePath),
+new TestingPartitionFileReader.Builder().build(),
+1024);
+assertThat(
+remoteTierConsumerAgent.getNextBuffer(
+new TieredStoragePartitionId(new 
ResultPartitionID()),
+new TieredStorageSubpartitionId(0),
+0))
+.isEmpty();
+}
+
+@Test
+void testGetBuffer() {
+int bufferSize = 10;
+PartitionFileReader partitionFileReader =
+new TestingPartitionFileReader.Builder()
+.setReadBufferSupplier(
+(bufferIndex, segmentId) ->
+new 
PartitionFileReader.ReadBufferResult(
+Collections.singletonList(
+
BufferBuilderTestUtils.buildSomeBuffer(
+bufferSize)),
+false,
+null))
+.build();
+RemoteTierConsumerAgent remoteTierConsumerAgent =
+new RemoteTierConsumerAgent(
+new RemoteStorageScanner(remoteStoragePath), 
partitionFileReader, 1024);
+Optional optionalBuffer =
+remoteTierConsumerAgent.getNextBuffer(
+new TieredStoragePartitionId(new ResultPartitionID()),
+new TieredStorageSubpartitionId(0),
+0);
+assertThat(optionalBuffer).isPresent();
+assertThat(optionalBuffer.get().readableBytes()).isEqualTo(bufferSize);

Review Comment:
   OK, good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #23419: [BP][FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent for tiered storage

2023-09-14 Thread via GitHub


reswqa commented on code in PR #23419:
URL: https://github.com/apache/flink/pull/23419#discussion_r1326775138


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileReader;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RemoteTierConsumerAgent}. */
+class RemoteTierConsumerAgentTest {
+
+@TempDir private File tempFolder;
+
+private String remoteStoragePath;
+
+@BeforeEach
+void before() {
+remoteStoragePath = Path.fromLocalFile(tempFolder).getPath();
+}
+
+@Test
+void testGetEmptyBuffer() {
+RemoteTierConsumerAgent remoteTierConsumerAgent =
+new RemoteTierConsumerAgent(
+new RemoteStorageScanner(remoteStoragePath),
+new TestingPartitionFileReader.Builder().build(),
+1024);
+assertThat(
+remoteTierConsumerAgent.getNextBuffer(
+new TieredStoragePartitionId(new 
ResultPartitionID()),
+new TieredStorageSubpartitionId(0),
+0))
+.isEmpty();
+}
+
+@Test
+void testGetBuffer() {
+int bufferSize = 10;
+PartitionFileReader partitionFileReader =
+new TestingPartitionFileReader.Builder()
+.setReadBufferSupplier(
+(bufferIndex, segmentId) ->
+new 
PartitionFileReader.ReadBufferResult(
+Collections.singletonList(
+
BufferBuilderTestUtils.buildSomeBuffer(
+bufferSize)),
+false,
+null))
+.build();
+RemoteTierConsumerAgent remoteTierConsumerAgent =
+new RemoteTierConsumerAgent(
+new RemoteStorageScanner(remoteStoragePath), 
partitionFileReader, 1024);
+Optional optionalBuffer =
+remoteTierConsumerAgent.getNextBuffer(
+new TieredStoragePartitionId(new ResultPartitionID()),
+new TieredStorageSubpartitionId(0),
+0);
+assertThat(optionalBuffer).isPresent();
+assertThat(optionalBuffer.get().readableBytes()).isEqualTo(bufferSize);

Review Comment:
   Maybe this can be rewrite by `hasValueSatisfy`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangzzu commented on a diff in pull request #23405: [FLINK-31895] Add End-to-end integration tests for failure labels

2023-09-14 Thread via GitHub


wangzzu commented on code in PR #23405:
URL: https://github.com/apache/flink/pull/23405#discussion_r1326748724


##
flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link
+ * org.apache.flink.configuration.JobManagerOptions#FAILURE_ENRICHERS_LIST}. 
We test this by
+ * creating a {@code CustomTestFailureEnricherFactory} and {@code 
CustomEnricher} which will add a

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangzzu commented on pull request #23399: [FLINK-33061][docs] Translate failure-enricher documentation to Chinese

2023-09-14 Thread via GitHub


wangzzu commented on PR #23399:
URL: https://github.com/apache/flink/pull/23399#issuecomment-1720436861

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1720408312

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765423#comment-17765423
 ] 

Hangxiang Yu commented on FLINK-33090:
--

Thanks for the proposal.

We also saw related problem in FLINK-26590 which I just linked.

Would you like to contribute your codes ?

> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Priority: Major
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] KarmaGYZ commented on pull request #23415: [FLINK-33053][zookeeper] Manually remove the leader watcher after ret…

2023-09-14 Thread via GitHub


KarmaGYZ commented on PR #23415:
URL: https://github.com/apache/flink/pull/23415#issuecomment-1720403714

   Thanks for the comment, @tisonkun . IIUC, if each client only remove 
watchers of its own session, it would not affect each other. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #23405: [FLINK-31895] Add End-to-end integration tests for failure labels

2023-09-14 Thread via GitHub


huwh commented on code in PR #23405:
URL: https://github.com/apache/flink/pull/23405#discussion_r1326709984


##
flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricher.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** The custom enricher for test. */
+public class CustomTestFailureEnricher implements FailureEnricher {
+
+private final Set outputKeys;
+
+public CustomTestFailureEnricher() {
+this.outputKeys = Collections.singleton("type");
+;

Review Comment:
   ```suggestion
   ```



##
flink-end-to-end-tests/flink-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link
+ * org.apache.flink.configuration.JobManagerOptions#FAILURE_ENRICHERS_LIST}. 
We test this by
+ * creating a {@code CustomTestFailureEnricherFactory} and {@code 
CustomEnricher} which will add a

Review Comment:
   CustomEnricher -> CustomTestFailureEnricher



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32953) [State TTL]resolve data correctness problem after ttl was changed

2023-09-14 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-32953:
-
Fix Version/s: 1.19.0

> [State TTL]resolve data correctness problem after ttl was changed 
> --
>
> Key: FLINK-32953
> URL: https://issues.apache.org/jira/browse/FLINK-32953
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Because expired data is cleaned up in background on a best effort basis 
> (hashmap use INCREMENTAL_CLEANUP strategy, rocksdb use 
> ROCKSDB_COMPACTION_FILTER strategy), some expired state is often persisted 
> into snapshots.
>  
> In some scenarios, user changes the state ttl of the job and then restore job 
> from the old state. If the user adjust the state ttl from a short value to a 
> long value (eg, from 12 hours to 24 hours),  some expired data that was not 
> cleaned up will be alive after restore. Obviously this is unreasonable, and 
> may break data regulatory requirements. 
>  
> Particularly, rocksdb stateBackend may cause data correctness problems due to 
> level compaction in this case.(eg. One key has two versions at level-1 and 
> level-2,both of which are ttl expired. Then level-1 version is cleaned up by 
> compaction,  and level-2 version isn't.  If we adjust state ttl and restart 
> job, the incorrect data of level-2 will become valid after restore)
>  
> To solve this problem, I think we can
> 1) persist old state ttl into snapshot meta info; (eg. 
> RegisteredKeyValueStateBackendMetaInfo or others)
> 2) During state restore, check the size between the current ttl and old ttl;
> 3) If current ttl is longer than old ttl, we need to iterate over all data, 
> filter out expired data with old ttl, and wirte valid data into stateBackend.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33022) Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-14 Thread Weihua Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weihua Hu updated FLINK-33022:
--
Release Note:   (was: Resolved in master: 
b51ee30c3d1a212947398d880a676f07f46f36be)

> Log an error when enrichers defined as part of the configuration can not be 
> found/loaded
> 
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33022) Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-14 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765421#comment-17765421
 ] 

Weihua Hu commented on FLINK-33022:
---

Resolved in master: b51ee30c3d1a212947398d880a676f07f46f36be

> Log an error when enrichers defined as part of the configuration can not be 
> found/loaded
> 
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33022) Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-14 Thread Weihua Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weihua Hu resolved FLINK-33022.
---
Fix Version/s: 1.19.0
 Release Note: Resolved in master: b51ee30c3d1a212947398d880a676f07f46f36be
   Resolution: Fixed

> Log an error when enrichers defined as part of the configuration can not be 
> found/loaded
> 
>
> Key: FLINK-33022
> URL: https://issues.apache.org/jira/browse/FLINK-33022
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matt Wang
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> if we configurate the `jobmanager.failure-enrichers`, but could not load the 
> class in 
> FailureEnricherUtils, no exceptions can be seen in the log currently, and it 
> is very inconvenient to check the problem. Here I suggest that some 
> ERROR-level logs should be added, or an exception should be thrown directly 
> (because the load cannot be uploaded is not an expected result)
> {code:java}
> // code placeholder
> @VisibleForTesting
> static Collection getFailureEnrichers(
> final Configuration configuration, final PluginManager pluginManager) 
> {
> Set includedEnrichers = 
> getIncludedFailureEnrichers(configuration);
> LOG.info("includedEnrichers: {}", includedEnrichers);
> //  When empty, NO enrichers will be started.
> if (includedEnrichers.isEmpty()) {
> return Collections.emptySet();
> }
> // TODO: here maybe load nothing
> final Iterator factoryIterator =
> pluginManager.load(FailureEnricherFactory.class);
> 
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32953) [State TTL]resolve data correctness problem after ttl was changed

2023-09-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765422#comment-17765422
 ] 

Hangxiang Yu commented on FLINK-32953:
--

merged 9a891f117c3a44a633316b84f9cbf2a541b80d11 into master

> [State TTL]resolve data correctness problem after ttl was changed 
> --
>
> Key: FLINK-32953
> URL: https://issues.apache.org/jira/browse/FLINK-32953
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
>
> Because expired data is cleaned up in background on a best effort basis 
> (hashmap use INCREMENTAL_CLEANUP strategy, rocksdb use 
> ROCKSDB_COMPACTION_FILTER strategy), some expired state is often persisted 
> into snapshots.
>  
> In some scenarios, user changes the state ttl of the job and then restore job 
> from the old state. If the user adjust the state ttl from a short value to a 
> long value (eg, from 12 hours to 24 hours),  some expired data that was not 
> cleaned up will be alive after restore. Obviously this is unreasonable, and 
> may break data regulatory requirements. 
>  
> Particularly, rocksdb stateBackend may cause data correctness problems due to 
> level compaction in this case.(eg. One key has two versions at level-1 and 
> level-2,both of which are ttl expired. Then level-1 version is cleaned up by 
> compaction,  and level-2 version isn't.  If we adjust state ttl and restart 
> job, the incorrect data of level-2 will become valid after restore)
>  
> To solve this problem, I think we can
> 1) persist old state ttl into snapshot meta info; (eg. 
> RegisteredKeyValueStateBackendMetaInfo or others)
> 2) During state restore, check the size between the current ttl and old ttl;
> 3) If current ttl is longer than old ttl, we need to iterate over all data, 
> filter out expired data with old ttl, and wirte valid data into stateBackend.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx closed pull request #23407: [FLINK-32953][docs]Add notes about changing state ttl value

2023-09-14 Thread via GitHub


masteryhx closed pull request #23407: [FLINK-32953][docs]Add notes about 
changing state ttl value
URL: https://github.com/apache/flink/pull/23407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee closed pull request #23390: test

2023-09-14 Thread via GitHub


JunRuiLee closed pull request #23390: test
URL: https://github.com/apache/flink/pull/23390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1720393465

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #23419: [BP][FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent for tiered storage

2023-09-14 Thread via GitHub


TanYuxin-tyx commented on PR #23419:
URL: https://github.com/apache/flink/pull/23419#issuecomment-1720381286

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #23418: [FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent of tiered storage

2023-09-14 Thread via GitHub


TanYuxin-tyx commented on PR #23418:
URL: https://github.com/apache/flink/pull/23418#issuecomment-1720381365

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] srpraneeth commented on a diff in pull request #670: [FLINK-31871] Interpret Flink MemoryUnits according to the actual user input

2023-09-14 Thread via GitHub


srpraneeth commented on code in PR #670:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/670#discussion_r1326653274


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java:
##
@@ -478,6 +478,50 @@ public void testTaskManagerSpec() {
 Double.valueOf(1), 
configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
 }
 
+@Test
+public void testApplyJobManagerSpecWithBiByteMemorySetting() {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] srpraneeth commented on a diff in pull request #670: [FLINK-31871] Interpret Flink MemoryUnits according to the actual user input

2023-09-14 Thread via GitHub


srpraneeth commented on code in PR #670:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/670#discussion_r1326652816


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##
@@ -420,13 +421,24 @@ private void setResource(Resource resource, Configuration 
effectiveConfig, boole
 ? JobManagerOptions.TOTAL_PROCESS_MEMORY
 : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
 if (resource.getMemory() != null) {
-effectiveConfig.setString(memoryConfigOption.key(), 
resource.getMemory());
+effectiveConfig.setString(
+memoryConfigOption.key(), 
parseResourceMemoryString(resource.getMemory()));
 }
 
 configureCpu(resource, effectiveConfig, isJM);
 }
 }
 
+// Using the K8s units specification for the JM and TM memory settings
+private String parseResourceMemoryString(String memory) {
+try {
+return MemorySize.parse(memory).toString();

Review Comment:
   After addressing the comments and using the Quantity as fall back logic, 
below is the behaviour for different cases. 
   Please review the same
   
   | Configuration | Previous InterpretedValue  | New 
InterpretedValue (Approach2)  |
   
|---|---|---|
   | 2g| 2147483648 b   | 2147483648 b  
|
   | 2gb   | 2147483648 b   | 2147483648 b  
|
   | 2G| 2147483648 b   | 2147483648 b  
|
   | 2 g   | 2147483648 b   | 2147483648 b  
|
   | 512m  | 536870912 b
| 536870912 b   |
   | 2gi   | Fail (Could not parse value '2gi') | 2147483648 b  
|
   | 2Gi   | Fail (Could not parse value '2Gi') | 2147483648 b  
|
   | 2gib  | Fail (Could not parse value '2gi') | 2147483648 b  
|
   | 2 Gi  | Fail (Could not parse value '2 Gi')| 2147483648 b  
|
   | 512mi | Fail (Could not parse value '512mi')   | 536870912 b   
|
   | 100b  | 100b   
| 100 b |
   | 100 b | 100b   
| 100 b |
   | 1e6 b  | Fail (Could not parse value '1e6 b')  | 100 b 
|
   | 1e6| Fail (Could not parse value '1e6')| 100 b 
|
   
   
   The only way for user to configure the giga(decimal) bytes is to convert 
into the numerical bytes and then use it.
   For ex: 2G = 2 * 1000 * 1000 * 1000 b = 20 b



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31390) Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary traversals.

2023-09-14 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-31390:
---
  Labels: auto-deprioritized-minor pull-request-available  (was: 
pull-request-available stale-minor)
Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary 
> traversals.
> ---
>
> Key: FLINK-31390
> URL: https://issues.apache.org/jira/browse/FLINK-31390
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, pull-request-available
>
> We can avoid the unnecessary traversals of the RelNode tree, since we are 
> only interested in the first satisfied plan.
>  
> FlinkChangelogModeInferenceProgram
> {code:java}
> val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
> val finalRoot = requiredUpdateKindTraits.flatMap {
>   requiredUpdateKindTrait =>
> updateKindTraitVisitor.visit(rootWithModifyKindSet, 
> requiredUpdateKindTrait)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32941) Table API Bridge `toDataStream(targetDataType)` function not working correctly for Java List

2023-09-14 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32941:
---
  Labels: auto-deprioritized-critical bridge  (was: bridge stale-critical)
Priority: Major  (was: Critical)

This issue was labeled "stale-critical" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Critical, 
please raise the priority and ask a committer to assign you the issue or revive 
the public discussion.


> Table API Bridge `toDataStream(targetDataType)` function not working 
> correctly for Java List
> 
>
> Key: FLINK-32941
> URL: https://issues.apache.org/jira/browse/FLINK-32941
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Tan Kim
>Priority: Major
>  Labels: auto-deprioritized-critical, bridge
>
> When the code below is executed, only the first element of the list is 
> assigned to the List variable in MyPoJo repeatedly.
> {code:java}
> case class Item(
>   name: String
> )
> case class MyPojo(
>   @DataTypeHist("RAW") items: java.util.List[Item]
> )
> ...
> tableEnv
>   .sqlQuery("select items from table")
>   .toDataStream(DataTypes.of(classOf[MyPoJo])) {code}
>  
> For example, if you have the following list coming in as input,
> ["a","b","c"]
> The value actually stored in MyPojo's list variable is
> ["a","a","a"] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32862) Support INIT operation type to be compatible with DTS on Alibaba Cloud

2023-09-14 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-32862:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Support INIT operation type to be compatible with DTS on Alibaba Cloud
> --
>
> Key: FLINK-32862
> URL: https://issues.apache.org/jira/browse/FLINK-32862
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Hang Ruan
>Assignee: Hang Ruan
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
>
> The operation type of canal json messages from DTS on Alibaba Cloud may 
> contain a new type `INIT`. We cannot handle these messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33091) Limit on outgoing connections to 64 seems unnecessary

2023-09-14 Thread Rogan Morrow (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rogan Morrow updated FLINK-33091:
-
Affects Version/s: 1.17.1

> Limit on outgoing connections to 64 seems unnecessary
> -
>
> Key: FLINK-33091
> URL: https://issues.apache.org/jira/browse/FLINK-33091
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.17.1
>Reporter: Rogan Morrow
>Priority: Major
>
> We have a job that results in a node having more than 64 outputs. However 
> submitting the job fails with error "Cannot currently handle nodes with more 
> than 64 outputs.". The error originates from this line:
> [https://github.com/apache/flink/blob/e7eeea033a68e1ff6bf82132b5a59eb0a5a2d0ed/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java#L355]
> There is no explanation in the code for why this arbitrary limit is set, so 
> I'm wondering what the purpose of it is.
> After forking Flink and removing the above line so that there is no limit, 
> the job works. So it seems that the limit is unnecessary and can be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33091) Limit on outgoing connections to 64 seems unnecessary

2023-09-14 Thread Rogan Morrow (Jira)
Rogan Morrow created FLINK-33091:


 Summary: Limit on outgoing connections to 64 seems unnecessary
 Key: FLINK-33091
 URL: https://issues.apache.org/jira/browse/FLINK-33091
 Project: Flink
  Issue Type: Improvement
Reporter: Rogan Morrow


We have a job that results in a node having more than 64 outputs. However 
submitting the job fails with error "Cannot currently handle nodes with more 
than 64 outputs.". The error originates from this line:

[https://github.com/apache/flink/blob/e7eeea033a68e1ff6bf82132b5a59eb0a5a2d0ed/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java#L355]

There is no explanation in the code for why this arbitrary limit is set, so I'm 
wondering what the purpose of it is.

After forking Flink and removing the above line so that there is no limit, the 
job works. So it seems that the limit is unnecessary and can be removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33029) Drop python 3.7 support

2023-09-14 Thread Matyas Orhidi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matyas Orhidi closed FLINK-33029.
-
Resolution: Fixed

fixed via merging 6ef837b11cc12d50b9f0e9306acfb786cc8804d1 on main

> Drop python 3.7 support
> ---
>
> Key: FLINK-33029
> URL: https://issues.apache.org/jira/browse/FLINK-33029
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] morhidi merged pull request #23421: [FLINK-33029][python] fix drop python 3.7 wheel building

2023-09-14 Thread via GitHub


morhidi merged PR #23421:
URL: https://github.com/apache/flink/pull/23421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26088) Add Elasticsearch 8.0 support

2023-09-14 Thread Matheus Felisberto (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765370#comment-17765370
 ] 

Matheus Felisberto commented on FLINK-26088:


Hi team, I want to give a heads-up here. I'm writing tests this whole week. I 
think I'll be asking for your review really soon. I'm sorry I couldn't have 
finished yet.

> Add Elasticsearch 8.0 support
> -
>
> Key: FLINK-26088
> URL: https://issues.apache.org/jira/browse/FLINK-26088
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Yuhao Bi
>Assignee: Matheus Felisberto
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Since Elasticsearch 8.0 is officially released, I think it's time to consider 
> adding es8 connector support.
> The High Level REST Client we used for connection [is marked deprecated in es 
> 7.15.0|https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html].
>  Maybe we can migrate to use the new [Java API 
> Client|https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.0/index.html]
>  at this time.
> Elasticsearch8.0 release note: 
> [https://www.elastic.co/guide/en/elasticsearch/reference/8.0/release-notes-8.0.0.html]
> release highlights: 
> [https://www.elastic.co/guide/en/elasticsearch/reference/8.0/release-highlights.html]
> REST API compatibility: 
> https://www.elastic.co/guide/en/elasticsearch/reference/8.0/rest-api-compatibility.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gaborgsomogyi commented on pull request #23421: [FLINK-33029][python] fix drop python 3.7 wheel building

2023-09-14 Thread via GitHub


gaborgsomogyi commented on PR #23421:
URL: https://github.com/apache/flink/pull/23421#issuecomment-1720187577

   Finally I was able to test the wheel building part in the PR CI. Azure 
passed with 
https://github.com/apache/flink/commit/771406ad79daaeef126ee3526f48adc500b09c68 
[here](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=53248).
   https://github.com/apache/flink/assets/18561820/6a2d614e-6746-47bf-a6b2-1cb1650bb71d;>
   Rolling back the test code and leaving only the fix in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-14 Thread Yi Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Zhang updated FLINK-33090:
-
Description: 
Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
JobManager's ioExecutor, however each checkpoint states is cleaned 
sequentially. With thousands of StateObjects to clean this can take long time 
on some checkpoint storage, if longer than the checkpoint interval this 
prevents new checkpointing.

The proposal is to use the same ioExecutor to clean up each checkpoints states 
in parallel as well. From my local testing, with default settings for 
ioExecutor thread pool for xK state files this can reduce clean up time from 10 
minutes to <1 minute. 

  was:
Currently CheckpointsCleaner can clean multiple checkpoints in parallel with 
JobManager's ioExecutor, however each checkpoint states is cleaned 
sequentially. With thousands of StateObjects to clean this can take long time 
on some checkpoint storage, if longer than the checkpoint interval this 
prevents new checkpointing.

The proposal is to use the same ioExecutor to clean up each checkpoints states 
in parallel as well. From my local testing, with default settings for 
ioExecutor thread pool for xK state files this can reduce clean up time from 10 
minutes to <1 minute. 


> CheckpointsCleaner clean individual checkpoint states in parallel
> -
>
> Key: FLINK-33090
> URL: https://issues.apache.org/jira/browse/FLINK-33090
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.1
>Reporter: Yi Zhang
>Priority: Major
>
> Currently CheckpointsCleaner clean multiple checkpoints in parallel with 
> JobManager's ioExecutor, however each checkpoint states is cleaned 
> sequentially. With thousands of StateObjects to clean this can take long time 
> on some checkpoint storage, if longer than the checkpoint interval this 
> prevents new checkpointing.
> The proposal is to use the same ioExecutor to clean up each checkpoints 
> states in parallel as well. From my local testing, with default settings for 
> ioExecutor thread pool for xK state files this can reduce clean up time from 
> 10 minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33090) CheckpointsCleaner clean individual checkpoint states in parallel

2023-09-14 Thread Yi Zhang (Jira)
Yi Zhang created FLINK-33090:


 Summary: CheckpointsCleaner clean individual checkpoint states in 
parallel
 Key: FLINK-33090
 URL: https://issues.apache.org/jira/browse/FLINK-33090
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.17.1
Reporter: Yi Zhang


Currently CheckpointsCleaner can clean multiple checkpoints in parallel with 
JobManager's ioExecutor, however each checkpoint states is cleaned 
sequentially. With thousands of StateObjects to clean this can take long time 
on some checkpoint storage, if longer than the checkpoint interval this 
prevents new checkpointing.

The proposal is to use the same ioExecutor to clean up each checkpoints states 
in parallel as well. From my local testing, with default settings for 
ioExecutor thread pool for xK state files this can reduce clean up time from 10 
minutes to <1 minute. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719863133

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] elkhand commented on a diff in pull request #23406: [FLINK-32884] [flink-clients] PyFlink remote execution should support URLs with paths and https scheme

2023-09-14 Thread via GitHub


elkhand commented on code in PR #23406:
URL: https://github.com/apache/flink/pull/23406#discussion_r1326304575


##
flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java:
##
@@ -46,6 +46,28 @@ void testCommandLineMaterialization() throws Exception {
 
 assertThat(configuration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
 assertThat(configuration.get(RestOptions.PORT)).isEqualTo(port);
+
+final String httpProtocol = "http";
+final String hostnameWithHttpScheme = httpProtocol + "://" + hostname;
+final String[] httpArgs = {"-m", hostnameWithHttpScheme + ':' + port};
+final CommandLine httpCommandLine = 
defaultCLI.parseCommandLineOptions(httpArgs, false);
+
+Configuration newConfiguration = 
defaultCLI.toConfiguration(httpCommandLine);
+
+
assertThat(newConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+assertThat(newConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+
assertThat(newConfiguration.get(RestOptions.PROTOCOL)).isEqualTo(httpProtocol);
+
+final String httpsProtocol = "https";
+final String hostnameWithHttpsScheme = httpsProtocol + "://" + 
hostname;
+final String[] httpsArgs = {"-m", hostnameWithHttpsScheme + ':' + 
port};
+final CommandLine httpsCommandLine = 
defaultCLI.parseCommandLineOptions(httpsArgs, false);
+
+Configuration httpsConfiguration = 
defaultCLI.toConfiguration(httpsCommandLine);
+
+
assertThat(httpsConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
+assertThat(httpsConfiguration.get(RestOptions.PORT)).isEqualTo(port);
+
assertThat(httpsConfiguration.get(RestOptions.PROTOCOL)).isEqualTo(httpsProtocol);

Review Comment:
   Added, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719852147

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719837084

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on pull request #23386: [FLINK-33022] Log an error when enrichers defined as part of the configuration can not be found/loaded

2023-09-14 Thread via GitHub


pgaref commented on PR #23386:
URL: https://github.com/apache/flink/pull/23386#issuecomment-1719696702

   > Sorry, I didn't express myself clearly earlier. Please take a look again
   
   Got it! Thanks again @huwh -- updated! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33086) Protect failure enrichment against unhandled exceptions

2023-09-14 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-33086.

Fix Version/s: 1.18.0
   Resolution: Fixed

master: e58a718d04e3ec6e2a43da8a868e5515916c0eea
1.18: 206609e822a8029a78245e6eef7ab5d88a0f370b

> Protect failure enrichment against unhandled exceptions
> ---
>
> Key: FLINK-33086
> URL: https://issues.apache.org/jira/browse/FLINK-33086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Existing 
> [labelFailure|https://github.com/apache/flink/blob/603181da811edb47c0d573492639a381fbbedc28/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java#L175]
>  async logic is expecting FailureEnricher future to never fail (or do their 
> own exception handling) however there is no way to enforce that as they are 
> loaded as they are implemented as pluggable components. This could result to 
> throwing away labels from other enrichers that successfully completed. 
> A better solution would be to handle the failures and LOG the errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #23413: [FLINK-33086] Protect failure enrichment against unhandled exceptions

2023-09-14 Thread via GitHub


zentol merged PR #23413:
URL: https://github.com/apache/flink/pull/23413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33089) Drop Flink 1.14 support

2023-09-14 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33089:
--

 Summary: Drop Flink 1.14 support
 Key: FLINK-33089
 URL: https://issues.apache.org/jira/browse/FLINK-33089
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.7.0


As agreed with the community we will only support the last 4 stable Flink minor 
versions.
With Flink 1.17 already out, we should drop 1.13 support from the operator.

This includes any special codepaths required and we should probably throw a 
validation error and short-circuit reconciliation on unsupported versions to 
signal to users and avoid any accidental deployment problems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1326121636


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > This is all about stability; "I could have rescaled and it wouldn't have 
been a problem.". But you can only conclude that after having out that slot for 
some period of time; otherwise you're just guessing.
   
   Exactly. So, when slot arrives scheduling a rescale-check after a timeout 
seems a good trade-off to ensure that the slot is still there after a period of 
time. When the timeout fires:
   - If the slot is still here we should force-rescale (better late than never 
case)
   - If the slot has vanished we should determine that the target parallelism 
is the same as current and not rescale (avoid unnecessary rescale case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23421: [FLINK-33029][python] fix drop python 3.7 wheel building

2023-09-14 Thread via GitHub


flinkbot commented on PR #23421:
URL: https://github.com/apache/flink/pull/23421#issuecomment-1719612819

   
   ## CI report:
   
   * c8535a0b53933444af8861e150349e360a0c2c8a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi opened a new pull request, #23421: [FLINK-33029][python] fix drop python 3.7 wheel building

2023-09-14 Thread via GitHub


gaborgsomogyi opened a new pull request, #23421:
URL: https://github.com/apache/flink/pull/23421

   ## What is the purpose of the change
   
   Nightly wheel building job fails because it tries to build 3.7 still.
   
   ## Brief change log
   
   Fixed wheel build job.
   
   ## Verifying this change
   
   Checked manually.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (FLINK-33029) Drop python 3.7 support

2023-09-14 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi reopened FLINK-33029:
---

> Drop python 3.7 support
> ---
>
> Key: FLINK-33029
> URL: https://issues.apache.org/jira/browse/FLINK-33029
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33029) Drop python 3.7 support

2023-09-14 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765205#comment-17765205
 ] 

Gabor Somogyi commented on FLINK-33029:
---

Nightly failed: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53196=logs=f73b5736-8355-5390-ec71-4dfdec0ce6c5=90f7230e-bf5a-531b-8566-ad48d3e03bbb

Creating a PR to fix it.

> Drop python 3.7 support
> ---
>
> Key: FLINK-33029
> URL: https://issues.apache.org/jira/browse/FLINK-33029
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


zentol commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1326042689


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   >  with this scenario we will restart every 5 min (as the timeout is 
exceeded min-increase check is overridden) 
   
   This depends on what your starting point for the timeout is. It shouldn't be 
fixed to the start of the job, but be reset after a job was rescaled.
   So, slot arrives at 24h mark, job is rescaled, timeout is reset; for the 
next hour we only scale up if min-parallel-increase is satisfied, if a slot 
arrives after that we'd immediately rescale.
   I still think this'd be problematic though as described above.
   
   
   I think a good way to think about it is like this: We probably all had some 
situation in our live where we thought "oh If only I had done that  
earlier." Can be anything, really, let's say it's washing the dishes. You can 
still do that _now_, but had you done it earlier you may _now_ have been able 
to use your time better.
   That's ultimately what we're doing here.
   We look back, see that we had this slot the whole time, and go "ah, if only 
I had rescaled sooner, but better late than never".
   But on the flip-side, if you lost that slot in the mean time and look back 
you'd think "good job me, that was a good call to ignore that slot!".
   
   This is all about stability; "I could have rescaled and it wouldn't have 
been a problem.". But you can only conclude that after having out that slot for 
some period of time; otherwise you're just guessing.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


zentol commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1326017232


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   > Should we cover this case?
   
   We should absolutely cover that case. The whole `scalingIntervalMax` idea 
was meant for that exact scenario; you have free slots below the 
min-parallelism-increase threshold for a long time. If you don't cover that 
case you just have the cooldown between rescale operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719520673

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangzzu commented on pull request #23405: [FLINK-31895] Add End-to-end integration tests for failure labels

2023-09-14 Thread via GitHub


wangzzu commented on PR #23405:
URL: https://github.com/apache/flink/pull/23405#issuecomment-1719516546

   @pgaref if you have time, Can you help me review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangzzu commented on a diff in pull request #23405: [FLINK-31895] Add End-to-end integration tests for failure labels

2023-09-14 Thread via GitHub


wangzzu commented on code in PR #23405:
URL: https://github.com/apache/flink/pull/23405#discussion_r1326002294


##
flink-end-to-end-tests/flink-end-to-end-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link

Review Comment:
   This description refers to the description in `ClassLoaderTestProgram`. It 
has been modified here and the specific script name has been added to the 
description. How do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] pnowojski commented on a diff in pull request #48: [FLINK-28758] Fix stop-with-savepoint for FlinkKafkaConsumer

2023-09-14 Thread via GitHub


pnowojski commented on code in PR #48:
URL: 
https://github.com/apache/flink-connector-kafka/pull/48#discussion_r1325995783


##
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerITCase.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/** ITCase tests class for {@link FlinkKafkaConsumer}. */
+public class FlinkKafkaConsumerITCase {
+private static final String TOPIC1 = "topic1";
+private static final String TOPIC2 = "topic2";
+
+@Nested

Review Comment:
   No, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on pull request #23420: test

2023-09-14 Thread via GitHub


JunRuiLee commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719505789

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] pnowojski commented on a diff in pull request #48: [FLINK-28758] Fix stop-with-savepoint for FlinkKafkaConsumer

2023-09-14 Thread via GitHub


pnowojski commented on code in PR #48:
URL: 
https://github.com/apache/flink-connector-kafka/pull/48#discussion_r1325984355


##
flink-connector-kafka/src/test/resources/log4j2-test.properties:
##
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review Comment:
   left over! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wangzzu commented on pull request #23399: [FLINK-33061][docs] Translate failure-enricher documentation to Chinese

2023-09-14 Thread via GitHub


wangzzu commented on PR #23399:
URL: https://github.com/apache/flink/pull/23399#issuecomment-1719495377

   @huwh thanks for your review, i have fixed these


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #670: [FLINK-31871] Interpret Flink MemoryUnits according to the actual user input

2023-09-14 Thread via GitHub


gyfora commented on code in PR #670:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/670#discussion_r1325961310


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##
@@ -420,13 +421,24 @@ private void setResource(Resource resource, Configuration 
effectiveConfig, boole
 ? JobManagerOptions.TOTAL_PROCESS_MEMORY
 : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
 if (resource.getMemory() != null) {
-effectiveConfig.setString(memoryConfigOption.key(), 
resource.getMemory());
+effectiveConfig.setString(
+memoryConfigOption.key(), 
parseResourceMemoryString(resource.getMemory()));
 }
 
 configureCpu(resource, effectiveConfig, isJM);
 }
 }
 
+// Using the K8s units specification for the JM and TM memory settings
+private String parseResourceMemoryString(String memory) {
+try {
+return MemorySize.parse(memory).toString();

Review Comment:
   Even if Flink memoryutils interpret the decimal bytes incorrectly, we should 
not silently change the interpretation as it may cause failure in currently 
deployed prod jobs. 
   
   We can fix this when we change "major version" ie bump the crd to v1. For 
now I think it is fine to interpret the current configs as before (even if 
technically incorrect at times) and open a follow up JIRA to fix this for v1.
   
   Wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #23417: [FLINK-33030][python]Add python 3.11 support

2023-09-14 Thread via GitHub


gaborgsomogyi commented on PR #23417:
URL: https://github.com/apache/flink/pull/23417#issuecomment-1719424923

   @HuangXingBo I've just double checked and seems like the maven central pemja 
is still showing `0.3.1` as latest:
   https://central.sonatype.com/artifact/com.alibaba/pemja
   
   I presume we need this since Flink uses this as dependency, right?
   
https://github.com/apache/flink/blob/c7f6470bb8cc314e7651b03e171af057f4edec1e/flink-python/pom.xml#L126-L132
   Could you please help to release the new version?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23420: test

2023-09-14 Thread via GitHub


flinkbot commented on PR #23420:
URL: https://github.com/apache/flink/pull/23420#issuecomment-1719406416

   
   ## CI report:
   
   * d539200c637f9fd6becd427e8985e6e442bcb7ac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33051) GlobalFailureHandler interface should be retired in favor of LabeledGlobalFailureHandler

2023-09-14 Thread Matt Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765162#comment-17765162
 ] 

Matt Wang edited comment on FLINK-33051 at 9/14/23 12:57 PM:
-

[~pgaref] Let me think about it. I don't think it's entirely reasonable to 
replace GlobalFailureHandler with LabeledGlobalFailureHandler. Now the 
implementation class of GlobalFailureHandler is mainly in the scheduler, while 
the implementation class of LabeledGlobalFailureHandler is 
{_}{color:#505f79}org.apache.flink.runtime.scheduler.adaptive.State{color}{_}. 
The _{color:#505f79}failureLabels{color}_ information is obtained in the 
scheduler ({_}{color:#505f79}DefaultScheduler/AdaptiveScheduler{color}{_}), and 
the API of Scheduler's _{color:#505f79}handleGlobalFailure(Throwable cause, 
CompletableFuture> failureLabels){color}_ is not reasonable.

However, GlobalFailureHandler and LabeledGlobalFailureHandler have overlapping 
functionality. Here, I tend to unify GlobalFailureHandler and 
LabeledGlobalFailureHandler into one interface, and State and scheduler will go 
through different methods. I will submit an MR today, and if you have time, 
help me review it.


was (Author: wangm92):
[~pgaref] Let me think about it. I don't think it's entirely reasonable to 
replace GlobalFailureHandler with LabeledGlobalFailureHandler. Now the 
implementation class of GlobalFailureHandler is mainly in the scheduler, while 
the implementation class of LabeledGlobalFailureHandler is 
`org.apache.flink.runtime.scheduler.adaptive.State`. The `failureLabels` 
information is obtained in the scheduler (DefaultScheduler/AdaptiveScheduler), 
and the API of Scheduler's `handleGlobalFailure(Throwable cause, 
CompletableFuture> failureLabels)` is not reasonable.

However, GlobalFailureHandler and LabeledGlobalFailureHandler have overlapping 
functionality. Here, I tend to unify GlobalFailureHandler and 
LabeledGlobalFailureHandler into one interface, and State and scheduler will go 
through different methods. I will submit an MR today, and if you have time, 
help me review it.

> GlobalFailureHandler interface should be retired in favor of 
> LabeledGlobalFailureHandler
> 
>
> Key: FLINK-33051
> URL: https://issues.apache.org/jira/browse/FLINK-33051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Matt Wang
>Priority: Minor
>
> FLIP-304 introduced `LabeledGlobalFailureHandler` interface that is an 
> extension of `GlobalFailureHandler` interface.  The later can thus be removed 
> in the future to avoid the existence of interfaces with duplicate functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33051) Unify the GlobalFailureHandler and LabeledGlobalFailureHandler interface

2023-09-14 Thread Matt Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated FLINK-33051:
--
Summary: Unify the GlobalFailureHandler and LabeledGlobalFailureHandler 
interface  (was: GlobalFailureHandler interface should be retired in favor of 
LabeledGlobalFailureHandler)

> Unify the GlobalFailureHandler and LabeledGlobalFailureHandler interface
> 
>
> Key: FLINK-33051
> URL: https://issues.apache.org/jira/browse/FLINK-33051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Matt Wang
>Priority: Minor
>
> FLIP-304 introduced `LabeledGlobalFailureHandler` interface that is an 
> extension of `GlobalFailureHandler` interface.  The later can thus be removed 
> in the future to avoid the existence of interfaces with duplicate functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33051) GlobalFailureHandler interface should be retired in favor of LabeledGlobalFailureHandler

2023-09-14 Thread Matt Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765162#comment-17765162
 ] 

Matt Wang commented on FLINK-33051:
---

[~pgaref] Let me think about it. I don't think it's entirely reasonable to 
replace GlobalFailureHandler with LabeledGlobalFailureHandler. Now the 
implementation class of GlobalFailureHandler is mainly in the scheduler, while 
the implementation class of LabeledGlobalFailureHandler is 
`org.apache.flink.runtime.scheduler.adaptive.State`. The `failureLabels` 
information is obtained in the scheduler (DefaultScheduler/AdaptiveScheduler), 
and the API of Scheduler's `handleGlobalFailure(Throwable cause, 
CompletableFuture> failureLabels)` is not reasonable.

However, GlobalFailureHandler and LabeledGlobalFailureHandler have overlapping 
functionality. Here, I tend to unify GlobalFailureHandler and 
LabeledGlobalFailureHandler into one interface, and State and scheduler will go 
through different methods. I will submit an MR today, and if you have time, 
help me review it.

> GlobalFailureHandler interface should be retired in favor of 
> LabeledGlobalFailureHandler
> 
>
> Key: FLINK-33051
> URL: https://issues.apache.org/jira/browse/FLINK-33051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Matt Wang
>Priority: Minor
>
> FLIP-304 introduced `LabeledGlobalFailureHandler` interface that is an 
> extension of `GlobalFailureHandler` interface.  The later can thus be removed 
> in the future to avoid the existence of interfaces with duplicate functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325907644


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -488,6 +488,23 @@ public enum SchedulerType {
 
code(SchedulerExecutionMode.REACTIVE.name()))
 .build());
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SCHEDULER_SCALING_INTERVAL_MIN =
+key("jobmanager.adaptive-scheduler.scaling-interval.min")
+.durationType()
+.defaultValue(Duration.ofSeconds(30))
+// rescaling and let the user increase the value for high 
workloads
+.withDescription(
+"Determines the minimum time (in seconds) between 
scaling operations in reactive mode.");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee opened a new pull request, #23420: test

2023-09-14 Thread via GitHub


JunRuiLee opened a new pull request, #23420:
URL: https://github.com/apache/flink/pull/23420

   run ci


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23419: [BP][FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent for tiered storage

2023-09-14 Thread via GitHub


flinkbot commented on PR #23419:
URL: https://github.com/apache/flink/pull/23419#issuecomment-1719385832

   
   ## CI report:
   
   * 08a938eaf6ac8b77f0bd9e294a59f95cad80a4f6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on pull request #23415: [FLINK-33053][zookeeper] Manually remove the leader watcher after ret…

2023-09-14 Thread via GitHub


KarmaGYZ commented on PR #23415:
URL: https://github.com/apache/flink/pull/23415#issuecomment-1719374280

   @tisonkun would you like to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx opened a new pull request, #23419: [BP][FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent for tiered storage

2023-09-14 Thread via GitHub


TanYuxin-tyx opened a new pull request, #23419:
URL: https://github.com/apache/flink/pull/23419

   
   
   ## What is the purpose of the change
   
   *Backport to 1.18 for https://github.com/apache/flink/pull/23418.*
   
   
   ## Brief change log
   Backport to 1.18 for https://github.com/apache/flink/pull/23418.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a backport without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23418: [FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent of tiered storage

2023-09-14 Thread via GitHub


flinkbot commented on PR #23418:
URL: https://github.com/apache/flink/pull/23418#issuecomment-1719365144

   
   ## CI report:
   
   * a970dae0e392bb46d63c7deba625e21a1703bd86 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33081) Move parallelism override logic into scale method

2023-09-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-33081.
--
Resolution: Fixed

merged to main 96f07a7296696f41dadfcaf827c95ddf22f83542

> Move parallelism override logic into scale method
> -
>
> Key: FLINK-33081
> URL: https://issues.apache.org/jira/browse/FLINK-33081
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> After FLINK-32589  the parallelism overrides are applied separately from the 
> scale call of the autoscaler implementation. We should simplify this by a 
> small refactoring



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] huwh commented on a diff in pull request #23399: [FLINK-33061][docs] Translate failure-enricher documentation to Chinese

2023-09-14 Thread via GitHub


huwh commented on code in PR #23399:
URL: https://github.com/apache/flink/pull/23399#discussion_r1325871369


##
docs/content.zh/docs/deployment/advanced/failure_enrichers.md:
##
@@ -21,34 +21,33 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Custom failure enrichers
-Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string key-value pairs).
-This enables users to implement their own failure enrichment plugins to 
categorize job failures, expose custom metrics, or make calls to external 
notification systems.
+## 自定义故障丰富器(Failure Enricher)
 
-FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
-Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's REST API (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+Flink 提供了一个可插拔的接口,供用户注册他们自定义的逻辑,并使用额外的元数据标签(字符串类型的 key-value 
对)来丰富故障(Failure)信息。
+这使得用户可以实现自定义的故障丰富器插件,对作业故障进行分类、对外暴露自定义指标,或者调用外部通知系统。
 
+每当 JobManager 在运行时收到异常时,都会触发 FailureEnrichers。
+每个 FailureEnricher 可以异步返回故障对应的标签(labels),这些标签可以通过 JobManager 的 REST API 
来查询(例如:"type:System" 的标签意味着该故障被分类为系统错误)。
 
-### Implement a plugin for your custom enricher
+### 实现一个自定义的故障丰富器插件
 
-To implement a custom FailureEnricher plugin, you need to:
+要实现自定义的 FailureEnricher 插件,需要按照下面的步骤:
 
-- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+- 通过实现 FailureEnricher 接口,添加自定义的 FailureEnricher;
 
-- Add your own FailureEnricherFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricherFactory.java"
 name="FailureEnricherFactory" >}} interface.
+- 通过实现 FailureEnricherFactory 接口,添加自定义的 FailureEnricherFactory;

Review Comment:
   ditto
   



##
docs/content.zh/docs/deployment/advanced/failure_enrichers.md:
##
@@ -21,34 +21,33 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Custom failure enrichers
-Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string key-value pairs).
-This enables users to implement their own failure enrichment plugins to 
categorize job failures, expose custom metrics, or make calls to external 
notification systems.
+## 自定义故障丰富器(Failure Enricher)
 
-FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
-Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's REST API (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+Flink 提供了一个可插拔的接口,供用户注册他们自定义的逻辑,并使用额外的元数据标签(字符串类型的 key-value 
对)来丰富故障(Failure)信息。
+这使得用户可以实现自定义的故障丰富器插件,对作业故障进行分类、对外暴露自定义指标,或者调用外部通知系统。
 
+每当 JobManager 在运行时收到异常时,都会触发 FailureEnrichers。
+每个 FailureEnricher 可以异步返回故障对应的标签(labels),这些标签可以通过 JobManager 的 REST API 
来查询(例如:"type:System" 的标签意味着该故障被分类为系统错误)。
 
-### Implement a plugin for your custom enricher
+### 实现一个自定义的故障丰富器插件
 
-To implement a custom FailureEnricher plugin, you need to:
+要实现自定义的 FailureEnricher 插件,需要按照下面的步骤:
 
-- Add your own FailureEnricher by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java"
 name="FailureEnricher" >}} interface.
+- 通过实现 FailureEnricher 接口,添加自定义的 FailureEnricher;

Review Comment:
   Can we keep the source code link here?



##
docs/content.zh/docs/deployment/advanced/failure_enrichers.md:
##
@@ -21,34 +21,33 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Custom failure enrichers
-Flink provides a pluggable interface for users to register their custom logic 
and enrich failures with extra metadata labels (string key-value pairs).
-This enables users to implement their own failure enrichment plugins to 
categorize job failures, expose custom metrics, or make calls to external 
notification systems.
+## 自定义故障丰富器(Failure Enricher)
 
-FailureEnrichers are triggered every time an exception is reported at runtime 
by the JobManager.
-Every FailureEnricher may asynchronously return labels associated with the 
failure that are then exposed via the JobManager's REST API (e.g., a 
'type:System' label implying the failure is categorized as a system error).
+Flink 提供了一个可插拔的接口,供用户注册他们自定义的逻辑,并使用额外的元数据标签(字符串类型的 key-value 
对)来丰富故障(Failure)信息。
+这使得用户可以实现自定义的故障丰富器插件,对作业故障进行分类、对外暴露自定义指标,或者调用外部通知系统。
 
+每当 JobManager 在运行时收到异常时,都会触发 FailureEnrichers。
+每个 FailureEnricher 可以异步返回故障对应的标签(labels),这些标签可以通过 JobManager 的 REST API 
来查询(例如:"type:System" 的标签意味着该故障被分类为系统错误)。
 
-### Implement a plugin for your custom enricher
+### 实现一个自定义的故障丰富器插件
 
-To implement a 

[jira] [Updated] (FLINK-33088) Fix NullPointerException in RemoteTierConsumerAgent of tiered storage

2023-09-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33088:
---
Labels: pull-request-available  (was: )

> Fix NullPointerException in RemoteTierConsumerAgent of tiered storage
> -
>
> Key: FLINK-33088
> URL: https://issues.apache.org/jira/browse/FLINK-33088
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when getting a buffer from RemoteTierConsumerAgent of tiered 
> storage, a NullPointerException may be thrown, we should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx opened a new pull request, #23418: [FLINK-33088][network] Fix NullPointerException in RemoteTierConsumerAgent of tiered storage

2023-09-14 Thread via GitHub


TanYuxin-tyx opened a new pull request, #23418:
URL: https://github.com/apache/flink/pull/23418

   
   
   
   
   ## What is the purpose of the change
   
   *Currently, when getting a buffer from RemoteTierConsumerAgent of tiered 
storage, a NullPointerException may be thrown, we should fix it.*
   
   
   ## Brief change log
   
 - *Fix NullPointerException in RemoteTierConsumerAgent of tiered storage*
   
   
   ## Verifying this change
   
   This change added test `RemoteTierConsumerAgentTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325871122


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -47,6 +51,10 @@
 class Executing extends StateWithExecutionGraph implements ResourceListener {
 
 private final Context context;
+private Instant lastRescale = Instant.EPOCH;

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Thanks Chesnay for your views (again) ! 
   > With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   This is what I proposed indeed, but you're right that
   
   > it might be side-stepping the intentions behind the min increase option 
and timeout
   
   if for example after the 24h resources start to arrive 1 slot every 5 min, 
with this scenario we will restart every 5 min (as the timeout is exceeded 
min-increase check is overridden) which is what we want to avoid. So it is 
better to schedule a timeout when resources arrive and `added ressource < min 
increase`. When the timeout fires, we do one single rescale that takes all the 
added slots in one shot.
   I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33077) Minimize the risk of hard back-pressure with buffer debloating enabled

2023-09-14 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan closed FLINK-33077.
-
Resolution: Fixed

Fixed in c7f6470bb8cc314e7651b03e171af057f4edec1e.

> Minimize the risk of hard back-pressure with buffer debloating enabled
> --
>
> Key: FLINK-33077
> URL: https://issues.apache.org/jira/browse/FLINK-33077
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> {*}Problem{*}:
> Buffer debloating sets buffer size to {{256}} bytes because of back-pressure.
> Such small buffers might not be enough to emit the processing results of a 
> single record. The task thread would request new buffers, and often block.
> That results in significant checkpoint delays (up to minutes instead of 
> seconds).
> Adding more overdraft buffers helps, but depends on the job DoP
> Raising {{taskmanager.memory.min-segment-size}} from {{256}} helps, but 
> depends on the multiplication factor of the operator.
> {*}Solution{*}:
>  * Ignore Buffer Debloater hints and extend the buffer if possible - when 
> this prevents emitting an output record fully AND this is the last available 
> buffer.
>  * Prevent the subsequent flush of the buffer so that more output records can 
> be emitted (flatMap-like and join operators)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


echauchot commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325847025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Thanks Chesnay for your views (again) ! 
   > With option 2 we rescale right away because the timeout already elapsed 
within the previous 24 hours.
   
   This is what I proposed indeed, but you're right that
   
   > it might be side-stepping the intentions behind the min increase option 
and timeout
   
   if for example after the 24h resources start to arrive 1 slot every 5 min, 
with this scenario we will restart every 5 min (as the timeout is exceeded) 
which we want to avoid. So it is better to schedule a timeout when resources 
arrive and `added ressource < min increase`. When the timeout fires we do one 
single rescale that takes all the added slots in one shot.
   I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #23405: [FLINK-31895] Add End-to-end integration tests for failure labels

2023-09-14 Thread via GitHub


huwh commented on code in PR #23405:
URL: https://github.com/apache/flink/pull/23405#discussion_r1325821692


##
flink-end-to-end-tests/flink-end-to-end-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomEnricher.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** The custom enricher for test. */
+public class CustomEnricher implements FailureEnricher {
+
+private final Set outputKeys;
+
+public CustomEnricher() {
+this.outputKeys = Stream.of("type").collect(Collectors.toSet());

Review Comment:
   ```suggestion
   this.outputKeys = Collections.singleton("type");
   ```



##
flink-end-to-end-tests/flink-end-to-end-failure-enricher-test/src/main/java/org/apache/flink/streaming/tests/FailureEnricherTestProgram.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * End-to-end test program for verifying that the {@link

Review Comment:
   Can we move these documents to test_failure_enricher.sh? This information is 
more like a description of the logic of test_failure_enricher.sh
   
   We can say that this program always throws RuntimeException, and is used for 
test_failure_enricher.sh.



##
flink-end-to-end-tests/flink-end-to-end-failure-enricher-test/src/main/java/org/apache/flink/runtime/enricher/CustomTestFailureEnricherFactory.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.enricher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricherFactory;
+
+/** The failure enricher factory for test. */

Review Comment:
   ```suggestion
   /** The factory of {@link CustomEnricher}. */
   ```



##
flink-end-to-end-tests/flink-end-to-end-failure-enricher-test/pom.xml:
##
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;

[jira] [Updated] (FLINK-31508) FLIP-304: Pluggable failure handling for Apache Flink

2023-09-14 Thread Matt Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated FLINK-31508:
--
Description: This is an umbrella ticket for 
[FLIP-304|https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers]
  (was: This is an umbrella ticket for 
[FLIP-304|https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+failure+handling+for+Apache+Flink])

> FLIP-304: Pluggable failure handling for Apache Flink
> -
>
> Key: FLINK-31508
> URL: https://issues.apache.org/jira/browse/FLINK-31508
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Runtime / REST
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
> Fix For: 1.18.0
>
>
> This is an umbrella ticket for 
> [FLIP-304|https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33088) Fix NullPointerException in RemoteTierConsumerAgent of tiered storage

2023-09-14 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-33088:
-

 Summary: Fix NullPointerException in RemoteTierConsumerAgent of 
tiered storage
 Key: FLINK-33088
 URL: https://issues.apache.org/jira/browse/FLINK-33088
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.18.0
Reporter: Yuxin Tan
Assignee: Yuxin Tan


Currently, when getting a buffer from RemoteTierConsumerAgent of tiered 
storage, a NullPointerException may be thrown, we should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32925) Select executing Release Manager

2023-09-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge resolved FLINK-32925.
-
Resolution: Fixed

> Select executing Release Manager
> 
>
> Key: FLINK-32925
> URL: https://issues.apache.org/jira/browse/FLINK-32925
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Jing Ge
>Priority: Major
>
> h4. GPG Key
> You need to have a GPG key to sign the release artifacts. Please be aware of 
> the ASF-wide [release signing 
> guidelines|https://www.apache.org/dev/release-signing.html]. If you don’t 
> have a GPG key associated with your Apache account, please create one 
> according to the guidelines.
> Determine your Apache GPG Key and Key ID, as follows:
> {code:java}
> $ gpg --list-keys
> {code}
> This will list your GPG keys. One of these should reflect your Apache 
> account, for example:
> {code:java}
> --
> pub   2048R/845E6689 2016-02-23
> uid                  Nomen Nescio 
> sub   2048R/BA4D50BE 2016-02-23
> {code}
> In the example above, the key ID is the 8-digit hex string in the \{{pub}} 
> line: \{{{}845E6689{}}}.
> Now, add your Apache GPG key to the Flink’s \{{KEYS}} file in the [Apache 
> Flink release KEYS 
> file|https://dist.apache.org/repos/dist/release/flink/KEYS] repository at 
> [dist.apache.org|http://dist.apache.org/]. Follow the instructions listed at 
> the top of these files. (Note: Only PMC members have write access to the 
> release repository. If you end up getting 403 errors ask on the mailing list 
> for assistance.)
> Configure \{{git}} to use this key when signing code by giving it your key 
> ID, as follows:
> {code:java}
> $ git config --global user.signingkey 845E6689
> {code}
> You may drop the \{{--global}} option if you’d prefer to use this key for the 
> current repository only.
> You may wish to start \{{gpg-agent}} to unlock your GPG key only once using 
> your passphrase. Otherwise, you may need to enter this passphrase hundreds of 
> times. The setup for \{{gpg-agent}} varies based on operating system, but may 
> be something like this:
> {code:bash}
> $ eval $(gpg-agent --daemon --no-grab --write-env-file $HOME/.gpg-agent-info)
> $ export GPG_TTY=$(tty)
> $ export GPG_AGENT_INFO
> {code}
> h4. Access to Apache Nexus repository
> Configure access to the [Apache Nexus 
> repository|https://repository.apache.org/], which enables final deployment of 
> releases to the Maven Central Repository.
>  # You log in with your Apache account.
>  # Confirm you have appropriate access by finding \{{org.apache.flink}} under 
> \{{{}Staging Profiles{}}}.
>  # Navigate to your \{{Profile}} (top right drop-down menu of the page).
>  # Choose \{{User Token}} from the dropdown, then click \{{{}Access User 
> Token{}}}. Copy a snippet of the Maven XML configuration block.
>  # Insert this snippet twice into your global Maven \{{settings.xml}} file, 
> typically \{{{}${HOME}/.m2/settings.xml{}}}. The end result should look like 
> this, where \{{TOKEN_NAME}} and \{{TOKEN_PASSWORD}} are your secret tokens:
> {code:xml}
> 
>    
>      
>        apache.releases.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>      
>        apache.snapshots.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>    
>  
> {code}
> h4. Website development setup
> Get ready for updating the Flink website by following the [website 
> development 
> instructions|https://flink.apache.org/contributing/improve-website.html].
> h4. GNU Tar Setup for Mac (Skip this step if you are not using a Mac)
> The default tar application on Mac does not support GNU archive format and 
> defaults to Pax. This bloats the archive with unnecessary metadata that can 
> result in additional files when decompressing (see [1.15.2-RC2 vote 
> thread|https://lists.apache.org/thread/mzbgsb7y9vdp9bs00gsgscsjv2ygy58q]). 
> Install gnu-tar and create a symbolic link to use in preference of the 
> default tar program.
> {code:bash}
> $ brew install gnu-tar
> $ ln -s /usr/local/bin/gtar /usr/local/bin/tar
> $ which tar
> {code}
>  
> 
> h3. Expectations
>  * Release Manager’s GPG key is published to 
> [dist.apache.org|http://dist.apache.org/]
>  * Release Manager’s GPG key is configured in git configuration
>  * Release Manager's GPG key is configured as the default gpg key.
>  * Release Manager has \{{org.apache.flink}} listed under Staging Profiles in 
> Nexus
>  * Release Manager’s Nexus User Token is configured in settings.xml



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] swuferhong commented on a diff in pull request #23414: [hotfix][test] Fix test value source throw ClassCastException while push down none-existent partition

2023-09-14 Thread via GitHub


swuferhong commented on code in PR #23414:
URL: https://github.com/apache/flink/pull/23414#discussion_r1325831177


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -1244,26 +1244,34 @@ public void applyPartitions(List> 
remainingPartitions) {
 } else {
 // we will read data from Collections.emptyList() if 
allPartitions is empty.
 // therefore, we should clear all data manually.
-remainingPartitions = (List>) 
Collections.emptyMap();
+remainingPartitions = createEmptyRemainingPartitions();
 this.data.put(Collections.emptyMap(), 
Collections.emptyList());
 }
 
 } else {
 this.allPartitions = remainingPartitions;
 if (remainingPartitions.isEmpty()) {
-remainingPartitions = (List>) 
Collections.emptyMap();
+remainingPartitions = createEmptyRemainingPartitions();
 }
 }
 // only keep the data in the remaining partitions
 this.data = pruneDataByRemainingPartitions(remainingPartitions, 
this.data);
 }
 
+private List> createEmptyRemainingPartitions() {

Review Comment:
   > What about using `Collections.singletonList(Collections.emptyMap())`?
   
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuyangzhong commented on a diff in pull request #23414: [hotfix][test] Fix test value source throw ClassCastException while push down none-existent partition

2023-09-14 Thread via GitHub


xuyangzhong commented on code in PR #23414:
URL: https://github.com/apache/flink/pull/23414#discussion_r1325817864


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -1244,26 +1244,34 @@ public void applyPartitions(List> 
remainingPartitions) {
 } else {
 // we will read data from Collections.emptyList() if 
allPartitions is empty.
 // therefore, we should clear all data manually.
-remainingPartitions = (List>) 
Collections.emptyMap();
+remainingPartitions = createEmptyRemainingPartitions();
 this.data.put(Collections.emptyMap(), 
Collections.emptyList());
 }
 
 } else {
 this.allPartitions = remainingPartitions;
 if (remainingPartitions.isEmpty()) {
-remainingPartitions = (List>) 
Collections.emptyMap();
+remainingPartitions = createEmptyRemainingPartitions();
 }
 }
 // only keep the data in the remaining partitions
 this.data = pruneDataByRemainingPartitions(remainingPartitions, 
this.data);
 }
 
+private List> createEmptyRemainingPartitions() {

Review Comment:
   What about using `Collections.singletonList(Collections.emptyMap())`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32925) Select executing Release Manager

2023-09-14 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765114#comment-17765114
 ] 

Jing Ge edited comment on FLINK-32925 at 9/14/23 11:13 AM:
---

I am not sure if you have the same issue, but the symbolic link does not work 
for me on mac:

$ ln -s /usr/local/bin/gtar /usr/local/bin/tar

it should be /opt/homebrew/bin/gtar

And we don't need that if we can just use gtar.


was (Author: jingge):
I am not sure if you have the same issue, but the symbolic link does not work 
for me on mac: 

$ ln -s /usr/local/bin/gtar /usr/local/bin/tar

it should be /opt/homebrew/bin/gtar

And we don't need that if we can just use gtar

> Select executing Release Manager
> 
>
> Key: FLINK-32925
> URL: https://issues.apache.org/jira/browse/FLINK-32925
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Jing Ge
>Priority: Major
>
> h4. GPG Key
> You need to have a GPG key to sign the release artifacts. Please be aware of 
> the ASF-wide [release signing 
> guidelines|https://www.apache.org/dev/release-signing.html]. If you don’t 
> have a GPG key associated with your Apache account, please create one 
> according to the guidelines.
> Determine your Apache GPG Key and Key ID, as follows:
> {code:java}
> $ gpg --list-keys
> {code}
> This will list your GPG keys. One of these should reflect your Apache 
> account, for example:
> {code:java}
> --
> pub   2048R/845E6689 2016-02-23
> uid                  Nomen Nescio 
> sub   2048R/BA4D50BE 2016-02-23
> {code}
> In the example above, the key ID is the 8-digit hex string in the \{{pub}} 
> line: \{{{}845E6689{}}}.
> Now, add your Apache GPG key to the Flink’s \{{KEYS}} file in the [Apache 
> Flink release KEYS 
> file|https://dist.apache.org/repos/dist/release/flink/KEYS] repository at 
> [dist.apache.org|http://dist.apache.org/]. Follow the instructions listed at 
> the top of these files. (Note: Only PMC members have write access to the 
> release repository. If you end up getting 403 errors ask on the mailing list 
> for assistance.)
> Configure \{{git}} to use this key when signing code by giving it your key 
> ID, as follows:
> {code:java}
> $ git config --global user.signingkey 845E6689
> {code}
> You may drop the \{{--global}} option if you’d prefer to use this key for the 
> current repository only.
> You may wish to start \{{gpg-agent}} to unlock your GPG key only once using 
> your passphrase. Otherwise, you may need to enter this passphrase hundreds of 
> times. The setup for \{{gpg-agent}} varies based on operating system, but may 
> be something like this:
> {code:bash}
> $ eval $(gpg-agent --daemon --no-grab --write-env-file $HOME/.gpg-agent-info)
> $ export GPG_TTY=$(tty)
> $ export GPG_AGENT_INFO
> {code}
> h4. Access to Apache Nexus repository
> Configure access to the [Apache Nexus 
> repository|https://repository.apache.org/], which enables final deployment of 
> releases to the Maven Central Repository.
>  # You log in with your Apache account.
>  # Confirm you have appropriate access by finding \{{org.apache.flink}} under 
> \{{{}Staging Profiles{}}}.
>  # Navigate to your \{{Profile}} (top right drop-down menu of the page).
>  # Choose \{{User Token}} from the dropdown, then click \{{{}Access User 
> Token{}}}. Copy a snippet of the Maven XML configuration block.
>  # Insert this snippet twice into your global Maven \{{settings.xml}} file, 
> typically \{{{}${HOME}/.m2/settings.xml{}}}. The end result should look like 
> this, where \{{TOKEN_NAME}} and \{{TOKEN_PASSWORD}} are your secret tokens:
> {code:xml}
> 
>    
>      
>        apache.releases.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>      
>        apache.snapshots.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>    
>  
> {code}
> h4. Website development setup
> Get ready for updating the Flink website by following the [website 
> development 
> instructions|https://flink.apache.org/contributing/improve-website.html].
> h4. GNU Tar Setup for Mac (Skip this step if you are not using a Mac)
> The default tar application on Mac does not support GNU archive format and 
> defaults to Pax. This bloats the archive with unnecessary metadata that can 
> result in additional files when decompressing (see [1.15.2-RC2 vote 
> thread|https://lists.apache.org/thread/mzbgsb7y9vdp9bs00gsgscsjv2ygy58q]). 
> Install gnu-tar and create a symbolic link to use in preference of the 
> default tar program.
> {code:bash}
> $ brew install gnu-tar
> $ ln -s /usr/local/bin/gtar /usr/local/bin/tar
> $ which tar
> {code}
>  
> 
> h3. Expectations
>  * Release Manager’s GPG key is published to 
> [dist.apache.org|http://dist.apache.org/]
>  * Release Manager’s GPG key is 

[jira] [Commented] (FLINK-32925) Select executing Release Manager

2023-09-14 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765114#comment-17765114
 ] 

Jing Ge commented on FLINK-32925:
-

I am not sure if you have the same issue, but the symbolic link does not work 
for me on mac: 

$ ln -s /usr/local/bin/gtar /usr/local/bin/tar

it should be /opt/homebrew/bin/gtar

And we don't need that if we can just use gtar

> Select executing Release Manager
> 
>
> Key: FLINK-32925
> URL: https://issues.apache.org/jira/browse/FLINK-32925
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Jing Ge
>Priority: Major
>
> h4. GPG Key
> You need to have a GPG key to sign the release artifacts. Please be aware of 
> the ASF-wide [release signing 
> guidelines|https://www.apache.org/dev/release-signing.html]. If you don’t 
> have a GPG key associated with your Apache account, please create one 
> according to the guidelines.
> Determine your Apache GPG Key and Key ID, as follows:
> {code:java}
> $ gpg --list-keys
> {code}
> This will list your GPG keys. One of these should reflect your Apache 
> account, for example:
> {code:java}
> --
> pub   2048R/845E6689 2016-02-23
> uid                  Nomen Nescio 
> sub   2048R/BA4D50BE 2016-02-23
> {code}
> In the example above, the key ID is the 8-digit hex string in the \{{pub}} 
> line: \{{{}845E6689{}}}.
> Now, add your Apache GPG key to the Flink’s \{{KEYS}} file in the [Apache 
> Flink release KEYS 
> file|https://dist.apache.org/repos/dist/release/flink/KEYS] repository at 
> [dist.apache.org|http://dist.apache.org/]. Follow the instructions listed at 
> the top of these files. (Note: Only PMC members have write access to the 
> release repository. If you end up getting 403 errors ask on the mailing list 
> for assistance.)
> Configure \{{git}} to use this key when signing code by giving it your key 
> ID, as follows:
> {code:java}
> $ git config --global user.signingkey 845E6689
> {code}
> You may drop the \{{--global}} option if you’d prefer to use this key for the 
> current repository only.
> You may wish to start \{{gpg-agent}} to unlock your GPG key only once using 
> your passphrase. Otherwise, you may need to enter this passphrase hundreds of 
> times. The setup for \{{gpg-agent}} varies based on operating system, but may 
> be something like this:
> {code:bash}
> $ eval $(gpg-agent --daemon --no-grab --write-env-file $HOME/.gpg-agent-info)
> $ export GPG_TTY=$(tty)
> $ export GPG_AGENT_INFO
> {code}
> h4. Access to Apache Nexus repository
> Configure access to the [Apache Nexus 
> repository|https://repository.apache.org/], which enables final deployment of 
> releases to the Maven Central Repository.
>  # You log in with your Apache account.
>  # Confirm you have appropriate access by finding \{{org.apache.flink}} under 
> \{{{}Staging Profiles{}}}.
>  # Navigate to your \{{Profile}} (top right drop-down menu of the page).
>  # Choose \{{User Token}} from the dropdown, then click \{{{}Access User 
> Token{}}}. Copy a snippet of the Maven XML configuration block.
>  # Insert this snippet twice into your global Maven \{{settings.xml}} file, 
> typically \{{{}${HOME}/.m2/settings.xml{}}}. The end result should look like 
> this, where \{{TOKEN_NAME}} and \{{TOKEN_PASSWORD}} are your secret tokens:
> {code:xml}
> 
>    
>      
>        apache.releases.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>      
>        apache.snapshots.https
>        TOKEN_NAME
>        TOKEN_PASSWORD
>      
>    
>  
> {code}
> h4. Website development setup
> Get ready for updating the Flink website by following the [website 
> development 
> instructions|https://flink.apache.org/contributing/improve-website.html].
> h4. GNU Tar Setup for Mac (Skip this step if you are not using a Mac)
> The default tar application on Mac does not support GNU archive format and 
> defaults to Pax. This bloats the archive with unnecessary metadata that can 
> result in additional files when decompressing (see [1.15.2-RC2 vote 
> thread|https://lists.apache.org/thread/mzbgsb7y9vdp9bs00gsgscsjv2ygy58q]). 
> Install gnu-tar and create a symbolic link to use in preference of the 
> default tar program.
> {code:bash}
> $ brew install gnu-tar
> $ ln -s /usr/local/bin/gtar /usr/local/bin/tar
> $ which tar
> {code}
>  
> 
> h3. Expectations
>  * Release Manager’s GPG key is published to 
> [dist.apache.org|http://dist.apache.org/]
>  * Release Manager’s GPG key is configured in git configuration
>  * Release Manager's GPG key is configured as the default gpg key.
>  * Release Manager has \{{org.apache.flink}} listed under Staging Profiles in 
> Nexus
>  * Release Manager’s Nexus User Token is configured in settings.xml



--
This message was sent by Atlassian Jira

[jira] [Assigned] (FLINK-31896) Extend web interface to support failure labels

2023-09-14 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-31896:
--

Assignee: Panagiotis Garefalakis

> Extend web interface to support failure labels
> --
>
> Key: FLINK-31896
> URL: https://issues.apache.org/jira/browse/FLINK-31896
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33086) Protect failure enrichment against unhandled exceptions

2023-09-14 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-33086:
--

Assignee: Panagiotis Garefalakis

> Protect failure enrichment against unhandled exceptions
> ---
>
> Key: FLINK-33086
> URL: https://issues.apache.org/jira/browse/FLINK-33086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>
> Existing 
> [labelFailure|https://github.com/apache/flink/blob/603181da811edb47c0d573492639a381fbbedc28/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java#L175]
>  async logic is expecting FailureEnricher future to never fail (or do their 
> own exception handling) however there is no way to enforce that as they are 
> loaded as they are implemented as pluggable components. This could result to 
> throwing away labels from other enrichers that successfully completed. 
> A better solution would be to handle the failures and LOG the errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32932) Build Release Candidate: 1.18.0-rc1

2023-09-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-32932:
---

Assignee: Jing Ge

> Build Release Candidate: 1.18.0-rc1
> ---
>
> Key: FLINK-32932
> URL: https://issues.apache.org/jira/browse/FLINK-32932
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Jing Ge
>Priority: Major
>
> The core of the release process is the build-vote-fix cycle. Each cycle 
> produces one release candidate. The Release Manager repeats this cycle until 
> the community approves one release candidate, which is then finalized.
> h4. Prerequisites
> Set up a few environment variables to simplify Maven commands that follow. 
> This identifies the release candidate being built. Start with {{RC_NUM}} 
> equal to 1 and increment it for each candidate:
> {code}
> RC_NUM="1"
> TAG="release-${RELEASE_VERSION}-rc${RC_NUM}"
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32964) KinesisStreamsSink cant renew credentials with WebIdentityTokenFileCredentialsProvider

2023-09-14 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765095#comment-17765095
 ] 

Aleksandr Pilipenko commented on FLINK-32964:
-

Hi [~pbenaoun],

I was not able to reproduce described issue using WEB_IDENTITY_TOKEN on EKS. 
Tested in Flink 1.17.1, Flink Kubernetes Operator 1.6, and Kinesis connector 
4.1.

Both source and sink were able to successfully authenticate for over a day 
without any issues, max IAM session duration was configured to 1 hour.

Based on this I don't think that issue caused by credential provider being 
unable to renew credentials.

 

Exception you've shared indicates that connector is already in the process of 
shutting down and not the reason for job failure.

Could you share any additional logs around the time of the error? 

> KinesisStreamsSink cant renew credentials with 
> WebIdentityTokenFileCredentialsProvider
> --
>
> Key: FLINK-32964
> URL: https://issues.apache.org/jira/browse/FLINK-32964
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, 1.16.2, 1.17.1
>Reporter: PhilippeB
>Priority: Major
>
> (First time filling a ticket in Flink community, please let me know if there 
> are any guidelinges I need to follow)
> I noticed a very strange behavior with the Kinesis Sink. I actually using 
> Flink in containerized and Application (reactive) mode on EKS with high 
> availability on S3. 
> Kinesis is configured with IAM role and appropried policies. 
> {code:java}
> //Here a part of my flink-config.yaml:
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> env.java.opts.jobmanager: -Dkubernetes.max.concurrent.requests=200
> containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
> aws.credentials.provider: WEB_IDENTITY_TOKEN
> aws.credentials.role.arn: role
> aws.credentials.role.sessionName: session
> aws.credentials.webIdentityToken.file: 
> /var/run/secrets/eks.amazonaws.com/serviceaccount/token {code}
> When my project is deployed the application and cluster are working well but 
> when the project has been started for about an hour, I suppose the IAM roles 
> session need to be renew, then the job become to crashing continuously.
> {code:java}
> 2023-08-24 10:35:55
> java.lang.IllegalStateException: Connection pool shut down
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:34)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>     at 
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
>     at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
>     at 
> 

[GitHub] [flink] 1996fanrui commented on a diff in pull request #22985: [FLINK-21883][scheduler] Implement cooldown period for adaptive scheduler

2023-09-14 Thread via GitHub


1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1325737044


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##
@@ -124,17 +154,33 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
 @Override
 public void onNewResourcesAvailable() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 @Override
 public void onNewResourceRequirements() {
-maybeRescale();
+rescaleWhenCooldownPeriodIsOver();
 }
 
 private void maybeRescale() {
-if (context.shouldRescale(getExecutionGraph())) {
-getLogger().info("Can change the parallelism of job. Restarting 
job.");
+final Duration timeSinceLastRescale = timeSinceLastRescale();
+rescaleScheduled = false;
+final boolean shouldForceRescale =
+(scalingIntervalMax != null)
+&& (timeSinceLastRescale.compareTo(scalingIntervalMax) 
> 0)
+&& (lastRescale != Instant.EPOCH); // initial rescale 
is not forced
+if (shouldForceRescale || context.shouldRescale(getExecutionGraph())) {
+if (shouldForceRescale) {
+getLogger()
+.info(
+"Time since last rescale ({}) >  {} ({}). 
Force-changing the parallelism of the job. Restarting the job.",
+timeSinceLastRescale,
+
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+scalingIntervalMax);
+} else {
+getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
+}
+lastRescale = Instant.now();
 context.goToRestarting(
 getExecutionGraph(),

Review Comment:
   Hi @zentol , thanks for your feedback here.
   
   Maybe you misunderstand here. I just clarify what the difference between 
@echauchot  and me. The current uncertainty is which cases we want to solve 
rather than the option or how the code is implemented. Here is a case: 
   
   - Assuming the scalingIntervalMax is 60 miuntes, the 
`jobmanager.adaptive-scheduler.min-parallelism-increase` is 2, and the expected 
paralleslim is 100.
   - The job starts at 09:00:00, and it run with parallelism 99.
   - At 09:40:00, the last TM comes.
   
   For this case, the runningTime is 40 minutes, it's less than 
scalingIntervalMax(60 miuntes). And the resource diff is 1 , it's less than 2 
(`jobmanager.adaptive-scheduler.min-parallelism-increase` ).
   
   - @echauchot prefer ignore this case, and never rescale.
   - I prefer cover this case, the solution is schedule a rescale after 
`scalingIntervalMax` or `scalingIntervalMax - runningTime`.
   
   The solution can be ignored now, what do you think about this case? Should 
we cover this case?
   
   @echauchot Please correct me if I'm wrong, thanks~
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #23412: [FLINK-33083] Properly apply ReadingMetadataSpec for a TableSourceScan

2023-09-14 Thread via GitHub


swuferhong commented on code in PR #23412:
URL: https://github.com/apache/flink/pull/23412#discussion_r1325700289


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java:
##
@@ -95,29 +92,19 @@ private DynamicTableSource getTableSource(FlinkContext 
context, FlinkTypeFactory
 FactoryUtil.createDynamicTableSource(
 factory,
 contextResolvedTable.getIdentifier(),
-resolvedCatalogTable,
+contextResolvedTable.getResolvedTable(),
 loadOptionsFromCatalogTable(contextResolvedTable, 
context),
 context.getTableConfig(),
 context.getClassLoader(),
 contextResolvedTable.isTemporary());
-// validate DynamicSource and apply Metadata
-DynamicSourceUtils.prepareDynamicSource(
-contextResolvedTable.getIdentifier().toString(),
-resolvedCatalogTable,
-tableSource,
-false,
-context.getTableConfig().getConfiguration());
 
 if (sourceAbilities != null) {
-//  Note: use DynamicSourceUtils.createProducedType to produce 
the type info so that
-//  keep consistent with sql2Rel phase which also called the 
method producing
-//  deterministic format (PHYSICAL COLUMNS + METADATA COLUMNS) 
when converts a given
-//  DynamicTableSource to a RelNode.
-// TODO should do a refactor(e.g., add serialized input type 
info into each
-//  SourceAbilitySpec so as to avoid this implicit logic 
dependency)
 RowType newProducedType =
-DynamicSourceUtils.createProducedType(
-contextResolvedTable.getResolvedSchema(), 
tableSource);
+(RowType)
+contextResolvedTable
+.getResolvedSchema()
+.toSourceRowDataType()
+.getLogicalType();

Review Comment:
   Hi, @dawidwys . For this case:
   ```
   String ddl =  "CREATE TABLE MyTable1 (\n"
   + "  metadata_0 int METADATA VIRTUAL,\n"
   + "  a0 int,\n"
   + "  a1 int,\n"
   + "  a2 int,\n"
   + "  ts STRING,\n "
   + "  rowtime as TO_TIMESTAMP(`ts`),\n"
   + "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' 
SECOND\n"
   + ") WITH ( .
   + ")";
   ```
   which `METADATA` column defined as first column.  
   ```
   
DynamicSourceUtils.createProducedType(contextResolvedTable.getResolvedSchema(), 
tableSource)
   ``` 
   will get different schema order with
   ```
   contextResolvedTable.getResolvedSchema()
   .toSourceRowDataType()
   .getLogicalType()
   ```
   
   IMO, I think it's better to use the origin order than convert the order to 
`PHYSICAL COLUMNS + METADATA COLUMNS`, but this need @twalthr to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33080) The checkpoint storage configured in the job level by config option will not take effect

2023-09-14 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-33080:

Component/s: Runtime / Configuration

> The checkpoint storage configured in the job level by config option will not 
> take effect
> 
>
> Key: FLINK-33080
> URL: https://issues.apache.org/jira/browse/FLINK-33080
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / State Backends
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> When we configure the checkpoint storage at the job level, it can only be 
> done through the following method:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getCheckpointConfig().setCheckpointStorage(xxx); {code}
> or configure filesystem storage by config option 
> CheckpointingOptions.CHECKPOINTS_DIRECTORY through the following method:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);{code}
> However, configure the other type checkpoint storage by the job-side 
> configuration like the following will not take effect:
> {code:java}
> Configuration configuration = new Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
> "aaa.bbb.ccc.CustomCheckpointStorage");
> configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); 
> {code}
> This behavior is unexpected, we should allow this way will take effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33080) The checkpoint storage configured in the job level by config option will not take effect

2023-09-14 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-33080:

Component/s: Runtime / Checkpointing
 (was: Runtime / State Backends)

> The checkpoint storage configured in the job level by config option will not 
> take effect
> 
>
> Key: FLINK-33080
> URL: https://issues.apache.org/jira/browse/FLINK-33080
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Configuration
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> When we configure the checkpoint storage at the job level, it can only be 
> done through the following method:
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getCheckpointConfig().setCheckpointStorage(xxx); {code}
> or configure filesystem storage by config option 
> CheckpointingOptions.CHECKPOINTS_DIRECTORY through the following method:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);{code}
> However, configure the other type checkpoint storage by the job-side 
> configuration like the following will not take effect:
> {code:java}
> Configuration configuration = new Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
> "aaa.bbb.ccc.CustomCheckpointStorage");
> configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); 
> {code}
> This behavior is unexpected, we should allow this way will take effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] swuferhong commented on a diff in pull request #23412: [FLINK-33083] Properly apply ReadingMetadataSpec for a TableSourceScan

2023-09-14 Thread via GitHub


swuferhong commented on code in PR #23412:
URL: https://github.com/apache/flink/pull/23412#discussion_r1325700289


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java:
##
@@ -95,29 +92,19 @@ private DynamicTableSource getTableSource(FlinkContext 
context, FlinkTypeFactory
 FactoryUtil.createDynamicTableSource(
 factory,
 contextResolvedTable.getIdentifier(),
-resolvedCatalogTable,
+contextResolvedTable.getResolvedTable(),
 loadOptionsFromCatalogTable(contextResolvedTable, 
context),
 context.getTableConfig(),
 context.getClassLoader(),
 contextResolvedTable.isTemporary());
-// validate DynamicSource and apply Metadata
-DynamicSourceUtils.prepareDynamicSource(
-contextResolvedTable.getIdentifier().toString(),
-resolvedCatalogTable,
-tableSource,
-false,
-context.getTableConfig().getConfiguration());
 
 if (sourceAbilities != null) {
-//  Note: use DynamicSourceUtils.createProducedType to produce 
the type info so that
-//  keep consistent with sql2Rel phase which also called the 
method producing
-//  deterministic format (PHYSICAL COLUMNS + METADATA COLUMNS) 
when converts a given
-//  DynamicTableSource to a RelNode.
-// TODO should do a refactor(e.g., add serialized input type 
info into each
-//  SourceAbilitySpec so as to avoid this implicit logic 
dependency)
 RowType newProducedType =
-DynamicSourceUtils.createProducedType(
-contextResolvedTable.getResolvedSchema(), 
tableSource);
+(RowType)
+contextResolvedTable
+.getResolvedSchema()
+.toSourceRowDataType()
+.getLogicalType();

Review Comment:
   Hi, @dawidwys . For this case:
   ```
   String ddl =  "CREATE TABLE MyTable1 (\n"
   + "  metadata_0 int METADATA VIRTUAL,\n"
   + "  a0 int,\n"
   + "  a1 int,\n"
   + "  a2 int,\n"
   + "  ts STRING,\n "
   + "  rowtime as TO_TIMESTAMP(`ts`),\n"
   + "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' 
SECOND\n"
   + ") WITH ( .
   + ")";
   ```
   which `METADATA` column defined as first column.  
   ```
   
DynamicSourceUtils.createProducedType(contextResolvedTable.getResolvedSchema(), 
tableSource)
   ``` 
   will get different schema order with
   ```
   contextResolvedTable.getResolvedSchema()
   .toSourceRowDataType()
   .getLogicalType()
   ```
   
   IMO, I think it's better to use the origin order than convert the order to 
`PHYSICAL COLUMNS + METADATA COLUMNS`, but this need @twalthr to confirm it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30025) Unified the max display column width for SqlClient and Table APi in both Streaming and Batch execMode

2023-09-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge updated FLINK-30025:

Fix Version/s: 1.18.0

> Unified the max display column width for SqlClient and Table APi in both 
> Streaming and Batch execMode
> -
>
> Key: FLINK-30025
> URL: https://issues.apache.org/jira/browse/FLINK-30025
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Jing Ge
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> FLIP-279 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-279+Unified+the+max+display+column+width+for+SqlClient+and+Table+APi+in+both+Streaming+and+Batch+execMode]
>  
> Background info:
> table.execute().print() can only use the default max column width. When 
> running table API program "table.execute().print();", the columns with long 
> string value are truncated to 30 chars. E.g.,:
> !https://static.dingtalk.com/media/lALPF6XTM7ZO1FXNASrNBEI_1090_298.png_620x1q90.jpg?auth_bizType=%27IM%27=im|width=457,height=125!
> I tried set the max width with: 
> tEnv.getConfig.getConfiguration.setInteger("sql-client.display.max-column-width",
>  100); It has no effect.  How can I set the max-width?
> Here is the example code:
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> tEnv.getConfig.getConfiguration.setInteger("sql-client.display.max-column-width",
>  100)
> val orderA = env
>   .fromCollection(Seq(Order(1L, "beer", 3), Order(1L, 
> "diaper-{-}{{-}}.diaper{{-}}{-}{-}.diaper{-}{-}{{-}}.diaper{{-}}{-}-.", 4), 
> Order(3L, "rubber", 2)))
>   .toTable(tEnv)
> orderA.execute().print()
>  
> "sql-client.display.max-column-width" seems only work in cli: SET 
> 'sql-client.display.max-column-width' = '40';
> While using Table API, by default, the DEFAULT_MAX_COLUMN_WIDTH in PrintStyle 
> is used now. It should be configurable. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dawidwys commented on a diff in pull request #23412: [FLINK-33083] Properly apply ReadingMetadataSpec for a TableSourceScan

2023-09-14 Thread via GitHub


dawidwys commented on code in PR #23412:
URL: https://github.com/apache/flink/pull/23412#discussion_r1325658492


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java:
##
@@ -95,29 +92,19 @@ private DynamicTableSource getTableSource(FlinkContext 
context, FlinkTypeFactory
 FactoryUtil.createDynamicTableSource(
 factory,
 contextResolvedTable.getIdentifier(),
-resolvedCatalogTable,
+contextResolvedTable.getResolvedTable(),
 loadOptionsFromCatalogTable(contextResolvedTable, 
context),
 context.getTableConfig(),
 context.getClassLoader(),
 contextResolvedTable.isTemporary());
-// validate DynamicSource and apply Metadata
-DynamicSourceUtils.prepareDynamicSource(
-contextResolvedTable.getIdentifier().toString(),
-resolvedCatalogTable,
-tableSource,
-false,
-context.getTableConfig().getConfiguration());
 
 if (sourceAbilities != null) {
-//  Note: use DynamicSourceUtils.createProducedType to produce 
the type info so that
-//  keep consistent with sql2Rel phase which also called the 
method producing
-//  deterministic format (PHYSICAL COLUMNS + METADATA COLUMNS) 
when converts a given
-//  DynamicTableSource to a RelNode.
-// TODO should do a refactor(e.g., add serialized input type 
info into each
-//  SourceAbilitySpec so as to avoid this implicit logic 
dependency)
 RowType newProducedType =
-DynamicSourceUtils.createProducedType(
-contextResolvedTable.getResolvedSchema(), 
tableSource);
+(RowType)
+contextResolvedTable
+.getResolvedSchema()
+.toSourceRowDataType()
+.getLogicalType();

Review Comment:
   Not sure, if I understand your question. 
   
   Before applying any specs the table has a schema as described by 
   ```
   contextResolvedTable.getResolvedSchema()
   .toSourceRowDataType()
   .getLogicalType()
   ```
   
   Only once the `ReadingMetadataSpec` is applied it will change the type to 
`spec.getProducedType().get()`. In this particular case of reading metadata in 
a `TableSourceScan` it is equal to 
`DynamicSourceUtils.createProducedType(contextResolvedTable.getResolvedSchema(),
 tableSource);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #23417: [FLINK-33030][python]Add python 3.11 support

2023-09-14 Thread via GitHub


gaborgsomogyi commented on code in PR #23417:
URL: https://github.com/apache/flink/pull/23417#discussion_r1325644523


##
tools/releasing/create_binary_release.sh:
##
@@ -129,8 +129,8 @@ make_python_release() {
   cp ${pyflink_actual_name} "${PYTHON_RELEASE_DIR}/${pyflink_release_name}"
 
   wheel_packages_num=0
-  # py38,py39,py310 for mac 10.9, 11.0 and linux (9 wheel packages)
-  EXPECTED_WHEEL_PACKAGES_NUM=9
+  # py38,py39,py310,py311 for mac 10.9, 11.0 and linux (12 wheel packages)
+  EXPECTED_WHEEL_PACKAGES_NUM=12

Review Comment:
   +3 because mac 10.9, mac 11.0 and linux



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23417: [FLINK-33030][python]Add python 3.11 support

2023-09-14 Thread via GitHub


flinkbot commented on PR #23417:
URL: https://github.com/apache/flink/pull/23417#issuecomment-1719070154

   
   ## CI report:
   
   * 721f303875656d69ae110b0e0cfb9d12ed40c2de UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33030) Add python 3.11 support

2023-09-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33030:
---
Labels: pull-request-available  (was: )

> Add python 3.11 support
> ---
>
> Key: FLINK-33030
> URL: https://issues.apache.org/jira/browse/FLINK-33030
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >