[jira] [Commented] (FLINK-34562) Port Debezium Avro Confluent changes (FLINK-34509) to Chinese
[ https://issues.apache.org/jira/browse/FLINK-34562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825154#comment-17825154 ] Vincent Woo commented on FLINK-34562: - I'm willing to contribute, could someone assign this to me? > Port Debezium Avro Confluent changes (FLINK-34509) to Chinese > - > > Key: FLINK-34562 > URL: https://issues.apache.org/jira/browse/FLINK-34562 > Project: Flink > Issue Type: Improvement > Components: chinese-translation, Documentation >Reporter: Lorenzo Affetti >Priority: Minor > > Port the changes applied in > [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34509] to the > Chinese version of the documentation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34638) Support default value of table column.
LvYanquan created FLINK-34638: - Summary: Support default value of table column. Key: FLINK-34638 URL: https://issues.apache.org/jira/browse/FLINK-34638 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: LvYanquan Fix For: cdc-3.1.0 Support default value of table column of literal constant. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]
ljz2051 commented on code in PR #24402: URL: https://github.com/apache/flink/pull/24402#discussion_r1519145995 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java: ## @@ -219,6 +219,18 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { snapshot = loadTaskStateSnapshot(checkpointID); } +// Even if local recovery is disabled, it is still necessary to load the TaskStateSnapshot +// so that it can be managed by the TaskLocalStateStore. +if (!localRecoveryConfig.isLocalRecoveryEnabled()) { Review Comment: It makes sense. I have changed the log level from "info" to "debug". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]
ljz2051 commented on code in PR #24402: URL: https://github.com/apache/flink/pull/24402#discussion_r1519145206 ## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java: ## @@ -35,7 +35,7 @@ import java.util.Collection; import java.util.Collections; -import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY; +import static org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY; Review Comment: Yeah, i have noticed it. And @fredia is fixing it in FLINK-34624. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]
minchowang commented on PR #84: URL: https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-1987561329 > @minchowang I submitted a fix in #85 along with a unit test that reproduces the issue. Replacing `%` with `floorMod` is simpler and keeps the function deterministic. I have tried to use Math.abs(), but one subtask is never assigned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [doc]Fix wrong function name [flink]
flinkbot commented on PR #24478: URL: https://github.com/apache/flink/pull/24478#issuecomment-1987525147 ## CI report: * 0301a286157fc86105ca53cd0007c458bbc75d4a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519078269 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link PartitionReduceOperator} is used to apply the reduce transformation on all records of + * each partition. Each partition contains all records of a subtask. + */ +@Internal +public class PartitionReduceOperator extends AbstractUdfStreamOperator> +implements OneInputStreamOperator, BoundedOneInput { + +private final ReduceFunction reduceFunction; + +private IN currentRecord = null; + +private long lastWatermarkTimestamp = Long.MIN_VALUE; + +public PartitionReduceOperator(ReduceFunction reduceFunction) { +super(reduceFunction); +this.reduceFunction = reduceFunction; +} + +@Override +public void setup( +StreamTask containingTask, StreamConfig config, Output> output) { +super.setup(containingTask, config, output); +} + +@Override +public void processElement(StreamRecord element) throws Exception { +if (currentRecord == null) { +currentRecord = element.getValue(); Review Comment: The job will turn off checkpoint when operators that have `OutputOnlyAfterEndOfStream=true` are running, which means those operators won't support checkpoint. BTW, the operators on keyed stream with `OutputOnlyAfterEndOfStream=true` will be added sorted input, which doesn't support checkpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [doc]Fix wrong function name [flink]
wslkxup opened a new pull request, #24478: URL: https://github.com/apache/flink/pull/24478 Function name LOCA_LTIME does not exist and should be LOCALTIME -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519076085 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link PartitionReduceOperator} is used to apply the reduce transformation on all records of + * each partition. Each partition contains all records of a subtask. + */ +@Internal +public class PartitionReduceOperator extends AbstractUdfStreamOperator> +implements OneInputStreamOperator, BoundedOneInput { + +private final ReduceFunction reduceFunction; + +private IN currentRecord = null; + +private long lastWatermarkTimestamp = Long.MIN_VALUE; + +public PartitionReduceOperator(ReduceFunction reduceFunction) { +super(reduceFunction); +this.reduceFunction = reduceFunction; +} + +@Override +public void setup( +StreamTask containingTask, StreamConfig config, Output> output) { +super.setup(containingTask, config, output); +} Review Comment: This method is redundant. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519075588 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The default implementation of {@link InternalAsyncIterator}. */ +@Internal +public class InternalAsyncIteratorImpl implements InternalAsyncIterator { Review Comment: I've refactored the implementation of the iterator for MapPartitionFunction and make the logic of the code clearer. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519075382 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.ExceptionUtils; + +/** + * The {@link MapPartitionOperator} is used to process all records in each partition on non-keyed + * stream. Each partition contains all records of a subtask. + */ +@Internal +public class MapPartitionOperator +extends AbstractUdfStreamOperator> +implements OneInputStreamOperator, BoundedOneInput { + +private final MapPartitionFunction function; + +private long lastWatermarkTimestamp = Long.MIN_VALUE; + +private InternalAsyncIterator iterator; + +public MapPartitionOperator(MapPartitionFunction function) { +super(function); +this.function = function; +} + +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +this.iterator = new InternalAsyncIteratorImpl<>(); +this.iterator.registerUDF( +iterator -> { +TimestampedCollector outputCollector = new TimestampedCollector<>(output); +try { +function.mapPartition(() -> iterator, outputCollector); Review Comment: I've set the operator to be `ChainingStrategy.NEVER`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed
[ https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825132#comment-17825132 ] Hongshun Wang commented on FLINK-34634: --- [~ruanhang1993] , [~renqs] , [~Leonard] , CC > Restarting the job will not read the changelog anymore if it stops before the > synchronization of meta information is complete and some table is removed > --- > > Key: FLINK-34634 > URL: https://issues.apache.org/jira/browse/FLINK-34634 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > Fix For: cdc-3.1.0 > > Attachments: image-2024-03-09-15-25-26-187.png, > image-2024-03-09-15-27-46-073.png > > > h3. What's the problem > Once, I removed a table from the option and then restarted the job from the > savepoint, but the job couldn't read the binlog anymore. When I checked the > logs, I found an Error level log stating: > ' The enumerator received invalid request meta group id 6, the valid meta > group id range is [0, 4].' > It appears that the Reader is requesting more splits than the Enumerator is > aware of. > However, the code should indeed remove redundant split information from the > Reader as seen in > [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does > this issue occur? > > h3. why occurs > !image-2024-03-09-15-25-26-187.png|width=751,height=329! > Upon examining the code, I discovered the cause. If the job stops before > completing all the split meta information and then restarts, this issue > occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader > is 6, and no meta information has been synchronized, leaving the > finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After > restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 > - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed > table have two split). This could lead to an out-of-range request. > !image-2024-03-09-15-27-46-073.png|width=755,height=305! > h3. How to reproduce > * Add Thread.sleep(1000L) in > com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents > to postpone split meta infos synchronization. > {code:java} > public void handleSourceEvents(SourceEvent sourceEvent) { > else if (sourceEvent instanceof BinlogSplitMetaEvent) { > LOG.debug( > "Source reader {} receives binlog meta with group id {}.", > subtaskId, > ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); > try { > Thread.sleep(1000L); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); > } {code} > * Add Thread.sleep(500L) in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne > to trigger savepoint before meta infos synchronization finishes. > > {code:java} > // step 2: execute insert and trigger savepoint with all tables added > { > // ..ingore > waitForSinkSize("sink", fetchedDataList.size()); > Thread.sleep(500L); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > finishedSavePointPath = triggerSavepointWithRetry(jobClient, > savepointDirectory); > jobClient.cancel().get(); > } > // test removing table one by one, note that there should be at least one > table remaining > for (int round = 0; round < captureAddressTables.length - 1; round++) { > ... > } > {code} > > * Add chunk-meta.group.size =2 in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement > Then, run > test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), > the error log will occur. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519074254 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.ExceptionUtils; + +/** + * The {@link MapPartitionOperator} is used to process all records in each partition on non-keyed + * stream. Each partition contains all records of a subtask. + */ +@Internal +public class MapPartitionOperator +extends AbstractUdfStreamOperator> +implements OneInputStreamOperator, BoundedOneInput { + +private final MapPartitionFunction function; + +private long lastWatermarkTimestamp = Long.MIN_VALUE; + +private InternalAsyncIterator iterator; + +public MapPartitionOperator(MapPartitionFunction function) { +super(function); +this.function = function; +} + +@Override +public void setup( Review Comment: The `open` method is now invoked later, and the `setup` will initialize the components needed for the operator's execution. It is more appropriate within the open method. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519073380 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The default implementation of {@link InternalAsyncIterator}. */ +@Internal +public class InternalAsyncIteratorImpl implements InternalAsyncIterator { + +/** + * Max number of caches. + * + * The constant defines the maximum number of caches that can be created. Its value is set to + * 100, which is considered sufficient for most parallel jobs. Each cache is a record and + * occupies a minimal amount of memory so the value is not excessively large. + */ +private static final int DEFAULT_MAX_CACHE_NUM = 100; + +/** The lock to ensure consistency between task main thread and udf executor. */ +private final Lock lock = new ReentrantLock(); + +/** The condition of lock. */ +private final Condition condition = lock.newCondition(); + +/** The task udf executor. */ +private final Executor udfExecutor = +Executors.newFixedThreadPool(1, new ExecutorThreadFactory("TaskUDFExecutor")); + +/** The queue to store record caches. */ +@GuardedBy("lock") +private final Queue recordCaches = new LinkedList<>(); + +/** The flag to represent the finished state of udf. */ +@GuardedBy("lock") +private boolean isUDFFinished = false; + +/** The flag to represent the closed state of this iterator. */ +@GuardedBy("lock") +private boolean isClosed = false; + +@Override +public boolean hasNext() { +return supplyWithLock( +() -> { +if (recordCaches.size() > 0) { +return true; +} else if (isClosed) { +return false; +} else { +waitToGetStatus(); +return hasNext(); +} +}); +} + +@Override +public IN next() { +return supplyWithLock( +() -> { +IN record; +if (recordCaches.size() > 0) { +record = recordCaches.poll(); +if (!isClosed) { +notifyStatus(); +} +return record; +} +waitToGetStatus(); +if (recordCaches.size() == 0) { +checkState(isClosed); +return null; +} +return recordCaches.poll(); +}); +} + +@Override +public void registerUDF(Consumer> udf) { +checkState(!isClosed); Review Comment: I've refactored the implementation of the iterator for MapPartitionFunction and make the logic of the code clearer. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Draft PR [flink]
1996fanrui closed pull request #24393: Draft PR URL: https://github.com/apache/flink/pull/24393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519071934 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkState; + +/** The default implementation of {@link InternalAsyncIterator}. */ +@Internal +public class InternalAsyncIteratorImpl implements InternalAsyncIterator { + +/** + * Max number of caches. + * + * The constant defines the maximum number of caches that can be created. Its value is set to + * 100, which is considered sufficient for most parallel jobs. Each cache is a record and + * occupies a minimal amount of memory so the value is not excessively large. + */ +private static final int DEFAULT_MAX_CACHE_NUM = 100; + +/** The lock to ensure consistency between task main thread and udf executor. */ +private final Lock lock = new ReentrantLock(); + +/** The condition of lock. */ +private final Condition condition = lock.newCondition(); + +/** The task udf executor. */ +private final Executor udfExecutor = +Executors.newFixedThreadPool(1, new ExecutorThreadFactory("TaskUDFExecutor")); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34152] Add an option to scale memory when downscaling [flink-kubernetes-operator]
1996fanrui commented on PR #786: URL: https://github.com/apache/flink-kubernetes-operator/pull/786#issuecomment-1987512556 > @1996fanrui I'm merging but I'll take any comments from your side that you might have reviewing this later on. Thanks @mxm for the improvement and ping! Sorry for the late response here. I just finished my vacation, I will take a look these 2 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519071465 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java: ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.MapPartitionOperator; +import org.apache.flink.streaming.api.operators.PartitionAggregateOperator; +import org.apache.flink.streaming.api.operators.PartitionReduceOperator; +import org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; + +import static org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator.DEFAULT_SORTPARTITION_MANAGE_MEMORY_WEIGHT; + +/** + * {@link PartitionWindowedStream} represents a data stream that collects records of each partition + * into a separate full window. Each partition contains all records of a subtask. The window + * emission will be triggered at the end of input. + * + * @param The type of the elements in this stream. + */ +@PublicEvolving +public class PartitionWindowedStream { + +private final StreamExecutionEnvironment environment; + +private final DataStream input; + +public PartitionWindowedStream(StreamExecutionEnvironment environment, DataStream input) { +this.environment = environment; +this.input = input; +} + +/** + * Process the records of the window by {@link MapPartitionFunction}. + * + * @param mapPartitionFunction The map partition function. + * @param The type of map partition result. + * @return The data stream with map partition result. + */ +public SingleOutputStreamOperator mapPartition( +MapPartitionFunction mapPartitionFunction) { +if (mapPartitionFunction == null) { +throw new NullPointerException("The map partition function must not be null."); +} Review Comment: Good point. I've added `checkNotNull` in the similar positions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1519070714 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; Review Comment: I've added two config options to config the manage memory weight for the SortPartition API on `KeyedPartitionWindowedStream` and `NonKeyedPartitionWindowedStream`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31663) Add ARRAY_EXCEPT supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825131#comment-17825131 ] Jacky Lau commented on FLINK-31663: --- hi [~snuyanzin] what is your opinion? > Add ARRAY_EXCEPT supported in SQL & Table API > - > > Key: FLINK-31663 > URL: https://issues.apache.org/jira/browse/FLINK-31663 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: luoyuxia >Assignee: Hanyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]
liuyongvs commented on PR #24421: URL: https://github.com/apache/flink/pull/24421#issuecomment-1987508497 hi @snuyanzin will you also help review this pr ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34503][table] Migrate JoinDeriveNullFilterRule to java. [flink]
liuyongvs commented on PR #24373: URL: https://github.com/apache/flink/pull/24373#issuecomment-1987508136 hi @snuyanzin will you also help review this pr ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]
liuyongvs commented on code in PR #24423: URL: https://github.com/apache/flink/pull/24423#discussion_r1519069215 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Intersect} with a distinct + * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link org.apache.calcite.rel.core.Join}. + * + * Only handle the case of input size 2. + */ +@Value.Enclosing +public class ReplaceIntersectWithSemiJoinRule +extends RelRule { + +public static final ReplaceIntersectWithSemiJoinRule INSTANCE = + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT +.toRule(); + +private ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) { +super(config); +} + +@Override +public boolean matches(RelOptRuleCall call) { +Intersect intersect = call.rel(0); +return !intersect.all && intersect.getInputs().size() == 2; +} + +@Override +public void onMatch(RelOptRuleCall call) { +Intersect intersect = call.rel(0); +RelNode left = intersect.getInput(0); +RelNode right = intersect.getInput(1); + +RelBuilder relBuilder = call.builder(); +List keys = Util.range(left.getRowType().getFieldCount()); +List conditions = generateEqualsCondition(relBuilder, left, right, keys); + +relBuilder.push(left); +relBuilder.push(right); +relBuilder +.join(JoinRelType.SEMI, conditions) +.aggregate( + relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); +RelNode rel = relBuilder.build(); +call.transformTo(rel); +} + +/** Rule configuration. */ +@Value.Immutable(singleton = false) +public interface ReplaceIntersectWithSemiJoinRuleConfig extends RelRule.Config { + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT = + ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig +.builder() +.build() +.withOperandSupplier(b0 -> b0.operand(Intersect.class).anyInputs()) +.withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) +.withDescription("ReplaceIntersectWithSemiJoinRule"); Review Comment: left as is, others do it same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]
liuyongvs commented on code in PR #24423: URL: https://github.com/apache/flink/pull/24423#discussion_r1519068996 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Intersect} with a distinct + * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link org.apache.calcite.rel.core.Join}. Review Comment: @snuyanzin fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510732371 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * The {@link AbstractSortPartitionOperator} is the base class of sort partition operator, which + * provides shared construction methods and utility functions. + * + * @param The type of input record. + * @param The type used to sort the records, which may be different from the INPUT_TYPE. + * For example, if the input record is sorted according to the selected key by {@link + * KeySelector}, the selected key should also be written to {@link ExternalSorter} with the + * input record to avid repeated key selections. In this case, the type used to sort the records + * will be a tuple containing both the selected key and record. + */ +@Internal +@SuppressWarnings("unchecked") +public abstract class AbstractSortPartitionOperator +extends AbstractStreamOperator +implements OneInputStreamOperator, BoundedOneInput { + +/** The default manage memory weight of sort partition operator. */ +public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128; Review Comment: 1.I think adding a default value is an option. This value is only a weight. This implies that if other operators do not use Managed Memory, operators of the SortPartition API can utilize all of the Managed Memory, and the size of the Managed Memory can be adjusted through configuration. By this means, the budget for Managed Memory can also be indirectly adjusted. 2.Adding a new configuration for a single API may introduce unnecessary complexity. We can leave this issue here as a pending question. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]
WencongLiu commented on code in PR #24398: URL: https://github.com/apache/flink/pull/24398#discussion_r1510727272 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java: ## @@ -1071,6 +1077,110 @@ protected SingleOutputStreamOperator aggregate(AggregationFunction aggrega return reduce(aggregate).name("Keyed Aggregation"); } +@Override +public PartitionWindowedStream fullWindowPartition() { +throw new UnsupportedOperationException( +"KeyedStream doesn't support processing non-keyed partitions."); Review Comment: After offline discussion, we all agree that a better version is "KeyedStream doesn't support full window on partitions, if you want operations over each-key, use methods in KeyedStream directly." In DataStream V1, the KeyedStream doesn't have the concept "all records of same key is a partition". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825123#comment-17825123 ] Hangxiang Yu commented on FLINK-29114: -- master: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58170=ms.vss-test-web.build-test-results-tab=4035576=115533=debug > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Major > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at >
Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]
masteryhx commented on PR #24418: URL: https://github.com/apache/flink/pull/24418#issuecomment-1987489513 The failed case is not related to this pr: [FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34632][runtime/checkpointing] Log checkpoint Id when logging delay [flink]
masteryhx commented on code in PR #24474: URL: https://github.com/apache/flink/pull/24474#discussion_r1519055013 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ## @@ -870,7 +870,8 @@ private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMe long delay = System.currentTimeMillis() - checkpointMetaData.getReceiveTimestamp(); if (delay >= CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS) { LOG.warn( -"Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: {}ms", +"Time from receiving all checkpoint barriers/RPC for checkpoint id {} to executing it exceeded threshold: {}ms", Review Comment: ```suggestion "Time from receiving all checkpoint barriers/RPC for checkpoint {} to executing it exceeded threshold: {}ms", ``` Maybe just use 'checkpoint' as other log info. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34585) [JUnit5 Migration] Module: Flink CDC
[ https://issues.apache.org/jira/browse/FLINK-34585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825121#comment-17825121 ] LvYanquan commented on FLINK-34585: --- I am willing to take this. > [JUnit5 Migration] Module: Flink CDC > > > Key: FLINK-34585 > URL: https://issues.apache.org/jira/browse/FLINK-34585 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Hang Ruan >Priority: Major > > Most tests in Flink CDC are still using Junit 4. We need to use Junit 5 > instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [DRAFT][FLINK-34637][table] Migrate JoinConditionEqualityTransferRule to java [flink]
flinkbot commented on PR #24477: URL: https://github.com/apache/flink/pull/24477#issuecomment-1987441888 ## CI report: * 8d7fa41547e16d601446501f8634f39784c85cea UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34637) Migrate JoinConditionEqualityTransferRule
[ https://issues.apache.org/jira/browse/FLINK-34637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34637: --- Labels: pull-request-available (was: ) > Migrate JoinConditionEqualityTransferRule > - > > Key: FLINK-34637 > URL: https://issues.apache.org/jira/browse/FLINK-34637 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [DRAFT][FLINK-34637][table] Migrate JoinConditionEqualityTransferRule to java [flink]
snuyanzin opened a new pull request, #24477: URL: https://github.com/apache/flink/pull/24477 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34156) Move Flink Calcite rules from Scala to Java
[ https://issues.apache.org/jira/browse/FLINK-34156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825119#comment-17825119 ] Sergey Nuyanzin edited comment on FLINK-34156 at 3/11/24 12:36 AM: --- IMHO the more people the better for the community may be however it would also make sense to double check with them what they are going to do, especially if they are new to Flink community was (Author: sergey nuyanzin): the more people the better for the community may be however it would also make sense to double check with them what they are going to do, especially if they are new to Flink community > Move Flink Calcite rules from Scala to Java > --- > > Key: FLINK-34156 > URL: https://issues.apache.org/jira/browse/FLINK-34156 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Planner >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Fix For: 2.0.0 > > > This is an umbrella task for migration of Calcite rules from Scala to Java > mentioned at [https://cwiki.apache.org/confluence/display/FLINK/2.0+Release] > The reason is that since 1.28.0 ( CALCITE-4787 - Move core to use Immutables > instead of ImmutableBeans ) Calcite started to use Immutables > ([https://immutables.github.io/]) and since 1.29.0 removed ImmutableBeans ( > CALCITE-4839 - Remove remnants of ImmutableBeans post 1.28 release ). All > rule configuration related api which is not Immutables based is marked as > deprecated. Since Immutables implies code generation while java compilation > it is seems impossible to use for rules in Scala code. > We could follow steps from javadocs of {{org.apache.calcite.plan.RelRule}} > written for migration from deprecated java api to Immutables. > It would work for scala to java migration as well. > Please keep in mind that there is +*no need*+ to migrate rules extending > +ConverterRule+ since these rules do not have such problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34156) Move Flink Calcite rules from Scala to Java
[ https://issues.apache.org/jira/browse/FLINK-34156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825119#comment-17825119 ] Sergey Nuyanzin commented on FLINK-34156: - the more people the better for the community may be however it would also make sense to double check with them what they are going to do, especially if they are new to Flink community > Move Flink Calcite rules from Scala to Java > --- > > Key: FLINK-34156 > URL: https://issues.apache.org/jira/browse/FLINK-34156 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Planner >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Fix For: 2.0.0 > > > This is an umbrella task for migration of Calcite rules from Scala to Java > mentioned at [https://cwiki.apache.org/confluence/display/FLINK/2.0+Release] > The reason is that since 1.28.0 ( CALCITE-4787 - Move core to use Immutables > instead of ImmutableBeans ) Calcite started to use Immutables > ([https://immutables.github.io/]) and since 1.29.0 removed ImmutableBeans ( > CALCITE-4839 - Remove remnants of ImmutableBeans post 1.28 release ). All > rule configuration related api which is not Immutables based is marked as > deprecated. Since Immutables implies code generation while java compilation > it is seems impossible to use for rules in Scala code. > We could follow steps from javadocs of {{org.apache.calcite.plan.RelRule}} > written for migration from deprecated java api to Immutables. > It would work for scala to java migration as well. > Please keep in mind that there is +*no need*+ to migrate rules extending > +ConverterRule+ since these rules do not have such problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34637) Migrate JoinConditionEqualityTransferRule
Sergey Nuyanzin created FLINK-34637: --- Summary: Migrate JoinConditionEqualityTransferRule Key: FLINK-34637 URL: https://issues.apache.org/jira/browse/FLINK-34637 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]
snuyanzin commented on code in PR #24423: URL: https://github.com/apache/flink/pull/24423#discussion_r1519025405 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Intersect} with a distinct + * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link org.apache.calcite.rel.core.Join}. + * + * Only handle the case of input size 2. + */ +@Value.Enclosing +public class ReplaceIntersectWithSemiJoinRule +extends RelRule { + +public static final ReplaceIntersectWithSemiJoinRule INSTANCE = + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT +.toRule(); + +private ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) { +super(config); +} + +@Override +public boolean matches(RelOptRuleCall call) { +Intersect intersect = call.rel(0); +return !intersect.all && intersect.getInputs().size() == 2; +} + +@Override +public void onMatch(RelOptRuleCall call) { +Intersect intersect = call.rel(0); +RelNode left = intersect.getInput(0); +RelNode right = intersect.getInput(1); + +RelBuilder relBuilder = call.builder(); +List keys = Util.range(left.getRowType().getFieldCount()); +List conditions = generateEqualsCondition(relBuilder, left, right, keys); + +relBuilder.push(left); +relBuilder.push(right); +relBuilder +.join(JoinRelType.SEMI, conditions) +.aggregate( + relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); +RelNode rel = relBuilder.build(); +call.transformTo(rel); +} + +/** Rule configuration. */ +@Value.Immutable(singleton = false) +public interface ReplaceIntersectWithSemiJoinRuleConfig extends RelRule.Config { + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT = + ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig +.builder() +.build() +.withOperandSupplier(b0 -> b0.operand(Intersect.class).anyInputs()) +.withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) +.withDescription("ReplaceIntersectWithSemiJoinRule"); Review Comment: very nit: in fact could be left as is however it also could be moved to builder and then the will not be created extra objects -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34158][table] Migrate WindowAggregateReduceFunctionsRule to java [flink]
snuyanzin commented on code in PR #24140: URL: https://github.com/apache/flink/pull/24140#discussion_r1519024028 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Rule to convert complex aggregation functions into simpler ones. Have a look at + * [[AggregateReduceFunctionsRule]] for details. + */ +public class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule { +private static final RelBuilderFactory LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE = +RelBuilder.proto( +Contexts.of( +RelFactories.DEFAULT_STRUCT, + RelBuilder.Config.DEFAULT.withPruneInputOfAggregate(false))); + +public static final WindowAggregateReduceFunctionsRule Review Comment: No, unfortunately that's not possible or at least I don't know how to do it. The problem is that normally the rule should extend from `RelRule` where custom config class is a parameter. However here we extend from `AggregateReduceFunctionsRule` where the config is already defained inside `AggregateReduceFunctionsRule` and we can not change it since the class is inside Calcite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34587][table] Introduce MODE aggregate function [flink]
snuyanzin commented on code in PR #24443: URL: https://github.com/apache/flink/pull/24443#discussion_r1519010699 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ModeAggFunctionTest.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.aggfunctions; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.functions.aggregate.ModeAggFunction; +import org.apache.flink.table.types.logical.VarCharType; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.data.StringData.fromString; + +/** Test case for built-in MODE with retraction aggregate function. */ +final class ModeAggFunctionTest +extends AggFunctionTestBase> { + +@Override +protected List> getInputValueSets() { +return Arrays.asList( +Arrays.asList(fromString("1"), fromString("1"), fromString("3")), +Arrays.asList(fromString("4"), null, fromString("4")), +Arrays.asList(null, null), +Arrays.asList( Review Comment: it seems there was an issue in test (merge/retract) which requires at least 2 input elements otherwise fail with exception. I added a hotfix to skipthis test for inputs with amount of elements less than 2 in a similar way it is done for cases without `merge`, `retract` implementation After that I added this test (merge is skipped however others are passed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]
dchristle commented on PR #84: URL: https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-1987392406 @minchowang I submitted a fix in https://github.com/apache/flink-connector-pulsar/pull/85 along with a unit test that reproduces the issue. Replacing `%` with `floorMod` is simpler and keeps the function deterministic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]
dchristle opened a new pull request, #85: URL: https://github.com/apache/flink-connector-pulsar/pull/85 ## Purpose of the change This PR fixes a bug in partition owner logic returning `-1` for the partition owner. The subtask ID to which we try to assign a split should always be within `[0, parallelism-1]`. The bug occurs because a non-partitioned topic has `partitionId = -1` and the hash logic for different topic names will yield `startIndex = 0`. Since the `%` operator can return negative values, the owner ID returned is `-1`. Replacing `%` with `floorMod` fixes this issue. ## Brief change log - Use `floorMod` instead of `%` so that `calculatePartitionOwner` cannot return a negative value. ## Verifying this change This change added tests and can be verified as follows: - Unit test to check that a split is assigned for different non-partitioned topic names and parallelism values. This test fails without the fix. ## Significant changes - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]
snuyanzin commented on code in PR #24423: URL: https://github.com/apache/flink/pull/24423#discussion_r1518998032 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link org.apache.calcite.rel.core.Intersect} with a distinct + * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link org.apache.calcite.rel.core.Join}. Review Comment: nit: is there a reason we use absolute name in javadocs? in scala it was relative why can't we follow same approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-1987359390 Hi @zhuzhurk could you please review the PR in you available time? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34601][table] Migrate of StreamPhysicalConstantTableFunctionScanRule to java [flink]
snuyanzin commented on PR #24449: URL: https://github.com/apache/flink/pull/24449#issuecomment-1987334698 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]
dchristle commented on code in PR #84: URL: https://github.com/apache/flink-connector-pulsar/pull/84#discussion_r1518922858 ## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java: ## @@ -125,11 +127,12 @@ protected int partitionOwner(TopicPartition partition) { @VisibleForTesting static int calculatePartitionOwner(String topic, int partitionId, int parallelism) { -int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism; +// int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism; /* * Here, the assumption is that the id of Pulsar partitions are always ascending starting from * 0. Therefore, can be used directly as the offset clockwise from the start index. */ -return (startIndex + partitionId) % parallelism; +// return (startIndex + partitionId) % parallelism; +return idCounter.getAndIncrement() % parallelism; Review Comment: This change would make assignment no longer deterministic from the inputs, since getAndIncrement will increment `idCounter` each time it's called. The use of `hashCode` should already ensure the partitions are somewhat evenly distributed across subtasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34629) Pulsar source lost topic subscribe
[ https://issues.apache.org/jira/browse/FLINK-34629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34629: --- Labels: pull-request-available (was: ) > Pulsar source lost topic subscribe > -- > > Key: FLINK-34629 > URL: https://issues.apache.org/jira/browse/FLINK-34629 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: pulsar-3.0.1 >Reporter: WangMinChao >Priority: Major > Labels: pull-request-available > > The non-partition pulsar topic partition id is `-1`, using multiples of the > non-partition topics > in Pulsar source maybe lose topic subscribe. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Modify BarrierAlignmentUtilTest comments [flink]
flinkbot commented on PR #24476: URL: https://github.com/apache/flink/pull/24476#issuecomment-1987316215 ## CI report: * 4c7831ad03b847349fba69fe1d45a35852b5fa6c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Modify BarrierAlignmentUtilTest comments [flink]
HCTommy commented on PR #24476: URL: https://github.com/apache/flink/pull/24476#issuecomment-1987315148 Corrected a comment of the [BarrierAlignmentUtilTest.java] file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Modify BarrierAlignmentUtilTest comments [flink]
HCTommy opened a new pull request, #24476: URL: https://github.com/apache/flink/pull/24476 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34524] Scale down JM deployment to 0 before deletion [flink-kubernetes-operator]
gyfora commented on PR #791: URL: https://github.com/apache/flink-kubernetes-operator/pull/791#issuecomment-1987310999 The new and improved / consistent logging after the rework @mateczagany @mxm : ``` [INFO ][default/basic-checkpoint-ha-example] >>> Event | Info| CLEANUP | Cleaning up FlinkDeployment [INFO ][default/basic-checkpoint-ha-example] Cleaning up autoscaling meta data [INFO ][default/basic-checkpoint-ha-example] Job is running, cancelling job. [INFO ][default/basic-checkpoint-ha-example] Job successfully cancelled. [INFO ][default/basic-checkpoint-ha-example] Deleting cluster with Foreground propagation [INFO ][default/basic-checkpoint-ha-example] Scaling JobManager Deployment to zero with 300 seconds timeout... [INFO ][default/basic-checkpoint-ha-example] Completed Scaling JobManager Deployment to zero [INFO ][default/basic-checkpoint-ha-example] Deleting JobManager Deployment with 298 seconds timeout... [INFO ][default/basic-checkpoint-ha-example] Completed Deleting JobManager Deployment [INFO ][default/basic-checkpoint-ha-example] Deleting Kubernetes HA metadata ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP][FLINK-32513][core] Add predecessor caching [flink]
flinkbot commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-1987262990 ## CI report: * 89fdb18a4980e46c30683675025736b9ce5fd05d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()
[ https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32513: --- Labels: pull-request-available (was: ) > Job in BATCH mode with a significant number of transformations freezes on > method StreamGraphGenerator.existsUnboundedSource() > - > > Key: FLINK-32513 > URL: https://issues.apache.org/jira/browse/FLINK-32513 > Project: Flink > Issue Type: Bug >Affects Versions: 1.15.3, 1.16.1, 1.17.1 > Environment: All modes (local, k8s session, k8s application, ...) > Flink 1.15.3 > Flink 1.16.1 > Flink 1.17.1 >Reporter: Vladislav Keda >Priority: Critical > Labels: pull-request-available > Attachments: image-2023-07-10-17-26-46-544.png > > > Flink job executed in BATCH mode with a significant number of transformations > (more than 30 in my case) takes very long time to start due to the method > StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of > the method, a lot of memory is consumed, which causes the GC to fire > frequently. > Thread Dump: > {code:java} > "main@1" prio=5 tid=0x1 nid=NA runnable > java.lang.Thread.State: RUNNABLE > at java.util.ArrayList.addAll(ArrayList.java:702) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95) > at > org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at > org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174) > at >
[PR] [WIP][FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov opened a new pull request, #24475: URL: https://github.com/apache/flink/pull/24475 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34524] Scale down JM deployment to 0 before deletion [flink-kubernetes-operator]
gyfora commented on code in PR #791: URL: https://github.com/apache/flink-kubernetes-operator/pull/791#discussion_r1518872217 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java: ## @@ -356,4 +353,32 @@ public boolean scalingCompleted(FlinkResourceContext ctx) { throw new RuntimeException(e); } } + +private long scaleJmToZero( +EditReplacePatchable jmDeployment, +String namespace, +String clusterId, +long timeoutMillis) { +LOG.info("Scaling down JobManager Deployment to zero before deletion"); +long startTime = System.currentTimeMillis(); +try { +// We use patching instead of scaling to avoid the need for new permissions +var patch = new DeploymentBuilder().editOrNewSpec().withReplicas(0).endSpec().build(); +jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), patch); +kubernetesClient +.pods() +.inNamespace(namespace) + .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)) +.waitUntilCondition( +Objects::isNull, + operatorConfig.getFlinkShutdownClusterTimeout().getSeconds(), Review Comment: fixed, factored out the shared logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34334][state] Add sub-task level RocksDB file count metrics [flink]
hejufang commented on code in PR #24322: URL: https://github.com/apache/flink/pull/24322#discussion_r1518860909 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java: ## @@ -31,55 +31,86 @@ */ @Internal public enum RocksDBProperty { -NumImmutableMemTable("num-immutable-mem-table"), -MemTableFlushPending("mem-table-flush-pending"), -CompactionPending("compaction-pending"), -BackgroundErrors("background-errors"), -CurSizeActiveMemTable("cur-size-active-mem-table"), -CurSizeAllMemTables("cur-size-all-mem-tables"), -SizeAllMemTables("size-all-mem-tables"), -NumEntriesActiveMemTable("num-entries-active-mem-table"), -NumEntriesImmMemTables("num-entries-imm-mem-tables"), -NumDeletesActiveMemTable("num-deletes-active-mem-table"), -NumDeletesImmMemTables("num-deletes-imm-mem-tables"), -EstimateNumKeys("estimate-num-keys"), -EstimateTableReadersMem("estimate-table-readers-mem"), -NumSnapshots("num-snapshots"), -NumLiveVersions("num-live-versions"), -EstimateLiveDataSize("estimate-live-data-size"), -TotalSstFilesSize("total-sst-files-size"), -LiveSstFilesSize("live-sst-files-size"), -EstimatePendingCompactionBytes("estimate-pending-compaction-bytes"), -NumRunningCompactions("num-running-compactions"), -NumRunningFlushes("num-running-flushes"), -ActualDelayedWriteRate("actual-delayed-write-rate"), -IsWriteStopped("is-write-stopped"), -BlockCacheCapacity("block-cache-capacity"), -BlockCacheUsage("block-cache-usage"), -BlockCachePinnedUsage("block-cache-pinned-usage"); +NumImmutableMemTable("num-immutable-mem-table", "number"), +MemTableFlushPending("mem-table-flush-pending", "number"), +CompactionPending("compaction-pending", "number"), +BackgroundErrors("background-errors", "number"), +CurSizeActiveMemTable("cur-size-active-mem-table", "number"), +CurSizeAllMemTables("cur-size-all-mem-tables", "number"), +SizeAllMemTables("size-all-mem-tables", "number"), +NumEntriesActiveMemTable("num-entries-active-mem-table", "number"), +NumEntriesImmMemTables("num-entries-imm-mem-tables", "number"), +NumDeletesActiveMemTable("num-deletes-active-mem-table", "number"), +NumDeletesImmMemTables("num-deletes-imm-mem-tables", "number"), +EstimateNumKeys("estimate-num-keys", "number"), +EstimateTableReadersMem("estimate-table-readers-mem", "number"), +NumSnapshots("num-snapshots", "number"), +NumLiveVersions("num-live-versions", "number"), +EstimateLiveDataSize("estimate-live-data-size", "number"), +TotalSstFilesSize("total-sst-files-size", "number"), +LiveSstFilesSize("live-sst-files-size", "number"), +EstimatePendingCompactionBytes("estimate-pending-compaction-bytes", "number"), +NumRunningCompactions("num-running-compactions", "number"), +NumRunningFlushes("num-running-flushes", "number"), +ActualDelayedWriteRate("actual-delayed-write-rate", "number"), +IsWriteStopped("is-write-stopped", "number"), +BlockCacheCapacity("block-cache-capacity", "number"), +BlockCacheUsage("block-cache-usage", "number"), +BlockCachePinnedUsage("block-cache-pinned-usage", "number"), +NumFilesAtLevel("num-files-at-level", "string"); private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s"; private static final String CONFIG_KEY_FORMAT = "state.backend.rocksdb.metrics.%s"; -private final String property; +private final String propertyName; -RocksDBProperty(String property) { -this.property = property; +private final String type; Review Comment: @Zakelly Thanks for your remind, I have changed it to enum class. Please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32367) lead function second param cause ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-32367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825031#comment-17825031 ] Jeyhun Karimov commented on FLINK-32367: Hi [~zhou_yb] when I tried to reproduce with the current master (d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the LEAD function successfully accepts and executes expression as parameter. For example, the following query works: {code:java} SELECT a, b, lead(b, a/2, 3) over (partition by a order by b), lag(b, 1, 3) over (partitionby a order by b) FROM Table6{code} Could you please verify or am I missing sth? > lead function second param cause ClassCastException > --- > > Key: FLINK-32367 > URL: https://issues.apache.org/jira/browse/FLINK-32367 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.15.4 >Reporter: zhou >Priority: Major > Attachments: image-2023-06-16-15-49-49-003.png, > image-2023-06-16-18-12-05-861.png > > > !image-2023-06-16-18-12-05-861.png!!image-2023-06-16-15-49-49-003.png! > lead function second param is expression (window_length/2),throw a exception > if lead function second param is number,it worked well -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33579) Join sql error
[ https://issues.apache.org/jira/browse/FLINK-33579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825027#comment-17825027 ] Jeyhun Karimov commented on FLINK-33579: Hi [~waywtdcc] when I tried to reproduce locally as of the current master (d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the above-mentioned exception seems to be gone. Could you please verify? > Join sql error > -- > > Key: FLINK-33579 > URL: https://issues.apache.org/jira/browse/FLINK-33579 > Project: Flink > Issue Type: Bug >Affects Versions: 1.17.1 >Reporter: waywtdcc >Priority: Major > > > {code:java} > set pipeline.operator-chaining=true; > set execution.runtime-mode=BATCH; > set table.exec.disabled-operators = NestedLoopJoin; > explain plan for > select > * > from > orders, > supplier, > customer > where > c_custkey = o_custkey and > c_nationkey = s_nationkey {code} > > > > error: > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > > FlinkLogicalJoin(condition=[AND(=($21, $2), =($24, $15))], joinType=[inner]) > :- FlinkLogicalJoin(condition=[true], joinType=[inner]) > : :- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, orders]], > fields=[uuid, o_orderkey, o_custkey, o_orderstatus, o_totalprice, > o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, ts]) > : +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, > supplier]], fields=[uuid, s_suppkey, s_name, s_address, s_nationkey, s_phone, > s_acctbal, s_comment, ts]) > +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, customer]], > fields=[uuid, c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, > c_mktsegment, c_comment, ts]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45) > at scala.collection.immutable.List.foreach(List.scala:388) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329) > at > org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:541) > at > org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115) > at > org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:620) > at > org.apache.flink.table.api.internal.TableEnvironmentInternal.explainInternal(TableEnvironmentInternal.java:96) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1296) > at >
[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Woo updated FLINK-34636: Description: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) {code} !image-20240308101407396.png|width=866,height=171! Network metric:Only this TM is always 100%, without any variation. !image-20240308100308649.png|width=868,height=338! The status of the task deployed to this TM cannot be RUNNING and the status change is slow !image-20240308101008765.png|width=869,height=118! Although the root exception thrown by the application is PartitionNotFoundException, the actual underlying root cause exception log found is IOException: Timeout triggered when requesting exclusive buffers !image-20240308101934756.png|width=869,height=394! was: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at
[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Woo updated FLINK-34636: Description: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) {code} !image-20240308101407396.png|width=866,height=171! Network metric:Only this TM is always 100%, without any variation. !image-20240308100308649.png|width=688,height=268! The status of the task deployed to this TM cannot be RUNNING and the status change is slow !image-20240308101008765.png|width=567,height=77! Although the root exception thrown by the application is PartitionNotFoundException, the actual underlying root cause exception log found is IOException: Timeout triggered when requesting exclusive buffers !image-20240308101934756.png|width=567,height=257! was: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at
[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
[ https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Woo updated FLINK-34636: Description: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) {code} !image-20240308101407396.png|width=577,height=114! Network metric:Only this TM is always 100%, without any variation. !image-20240308100308649.png|width=570,height=222! The status of the task deployed to this TM cannot be RUNNING and the status change is slow !image-20240308101008765.png|width=567,height=77! Although the root exception thrown by the application is PartitionNotFoundException, the actual underlying root cause exception log found is IOException: Timeout triggered when requesting exclusive buffers !image-20240308101934756.png|width=567,height=257! was: Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at
[jira] [Created] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered
Vincent Woo created FLINK-34636: --- Summary: Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered Key: FLINK-34636 URL: https://issues.apache.org/jira/browse/FLINK-34636 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Vincent Woo Attachments: image-20240308100308649.png, image-20240308101008765.png, image-20240308101407396.png, image-20240308101934756.png Based on the observation of logs and metrics, it was found that a subtask deployed on a same TM consistently reported an exception of requesting exclusive buffers timeout. It was discovered that during the restart process, 【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I suspect that the network buffer memory was not properly released during the restart process, which caused the newly deployed task to fail to obtain the network buffer. This problem persisted despite repeated restarts, and the application failed to recover automatically. (I'm not sure if there are other reasons for this issue) Attached below are screenshots of the exception stack and relevant metrics: {code:java} 2024-03-08 09:58:18,738 WARN org.apache.flink.runtime.taskmanager.Task [] - GroupWindowAggregate switched from DEPLOYING to FAILED with failure cause: java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) {code} !image-20240308101407396.png! Network metric:Only this TM is always 100%, without any variation. !image-20240308100308649.png|width=2540,height=989! The status of the task deployed to this TM cannot be RUNNING and the status change is slow !image-20240308101008765.png! Although the root exception thrown by the application is PartitionNotFoundException, the actual underlying root cause exception log found is IOException: Timeout triggered when requesting exclusive buffers !image-20240308101934756.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)