[jira] [Commented] (FLINK-34562) Port Debezium Avro Confluent changes (FLINK-34509) to Chinese

2024-03-10 Thread Vincent Woo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825154#comment-17825154
 ] 

Vincent Woo commented on FLINK-34562:
-

I'm willing to contribute, could someone assign this to me?

> Port Debezium Avro Confluent changes (FLINK-34509) to Chinese
> -
>
> Key: FLINK-34562
> URL: https://issues.apache.org/jira/browse/FLINK-34562
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Reporter: Lorenzo Affetti
>Priority: Minor
>
> Port the changes applied in 
> [https://issues.apache.org/jira/projects/FLINK/issues/FLINK-34509] to the 
> Chinese version of the documentation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34638) Support default value of table column.

2024-03-10 Thread LvYanquan (Jira)
LvYanquan created FLINK-34638:
-

 Summary: Support default value of table column.
 Key: FLINK-34638
 URL: https://issues.apache.org/jira/browse/FLINK-34638
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: LvYanquan
 Fix For: cdc-3.1.0


Support default value of table column of literal constant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]

2024-03-10 Thread via GitHub


ljz2051 commented on code in PR #24402:
URL: https://github.com/apache/flink/pull/24402#discussion_r1519145995


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##
@@ -219,6 +219,18 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
 snapshot = loadTaskStateSnapshot(checkpointID);
 }
 
+// Even if local recovery is disabled, it is still necessary to load 
the TaskStateSnapshot
+// so that it can be managed by the TaskLocalStateStore.
+if (!localRecoveryConfig.isLocalRecoveryEnabled()) {

Review Comment:
   It makes sense. I have changed the log level from "info" to "debug".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34484][state] Split 'state.backend.local-recovery' into two options for checkpointing and recovery [flink]

2024-03-10 Thread via GitHub


ljz2051 commented on code in PR #24402:
URL: https://github.com/apache/flink/pull/24402#discussion_r1519145206


##
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java:
##
@@ -35,7 +35,7 @@
 import java.util.Collection;
 import java.util.Collections;
 
-import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.StateRecoveryOptions.LOCAL_RECOVERY;

Review Comment:
   Yeah, i have noticed it.  And @fredia  is fixing it in FLINK-34624.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-03-10 Thread via GitHub


minchowang commented on PR #84:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-1987561329

   > @minchowang I submitted a fix in #85 along with a unit test that 
reproduces the issue. Replacing `%` with `floorMod` is simpler and keeps the 
function deterministic.
   
   I have tried to use Math.abs(), but one subtask is never assigned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [doc]Fix wrong function name [flink]

2024-03-10 Thread via GitHub


flinkbot commented on PR #24478:
URL: https://github.com/apache/flink/pull/24478#issuecomment-1987525147

   
   ## CI report:
   
   * 0301a286157fc86105ca53cd0007c458bbc75d4a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519078269


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link PartitionReduceOperator} is used to apply the reduce 
transformation on all records of
+ * each partition. Each partition contains all records of a subtask.
+ */
+@Internal
+public class PartitionReduceOperator extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final ReduceFunction reduceFunction;
+
+private IN currentRecord = null;
+
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+
+public PartitionReduceOperator(ReduceFunction reduceFunction) {
+super(reduceFunction);
+this.reduceFunction = reduceFunction;
+}
+
+@Override
+public void setup(
+StreamTask containingTask, StreamConfig config, 
Output> output) {
+super.setup(containingTask, config, output);
+}
+
+@Override
+public void processElement(StreamRecord element) throws Exception {
+if (currentRecord == null) {
+currentRecord = element.getValue();

Review Comment:
   The job will turn off checkpoint when operators that have 
`OutputOnlyAfterEndOfStream=true` are running, which means those operators 
won't  support checkpoint.
   
   BTW, the operators on keyed stream with `OutputOnlyAfterEndOfStream=true` 
will be added sorted input,  which doesn't support checkpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [doc]Fix wrong function name [flink]

2024-03-10 Thread via GitHub


wslkxup opened a new pull request, #24478:
URL: https://github.com/apache/flink/pull/24478

   Function name LOCA_LTIME does not exist and should be LOCALTIME


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519076085


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/PartitionReduceOperator.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link PartitionReduceOperator} is used to apply the reduce 
transformation on all records of
+ * each partition. Each partition contains all records of a subtask.
+ */
+@Internal
+public class PartitionReduceOperator extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final ReduceFunction reduceFunction;
+
+private IN currentRecord = null;
+
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+
+public PartitionReduceOperator(ReduceFunction reduceFunction) {
+super(reduceFunction);
+this.reduceFunction = reduceFunction;
+}
+
+@Override
+public void setup(
+StreamTask containingTask, StreamConfig config, 
Output> output) {
+super.setup(containingTask, config, output);
+}

Review Comment:
   This method is redundant. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519075588


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link InternalAsyncIterator}. */
+@Internal
+public class InternalAsyncIteratorImpl implements 
InternalAsyncIterator {

Review Comment:
   I've refactored the implementation of the iterator for MapPartitionFunction 
and make the logic of the code clearer. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519075382


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+
+private InternalAsyncIterator iterator;
+
+public MapPartitionOperator(MapPartitionFunction function) {
+super(function);
+this.function = function;
+}
+
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+this.iterator = new InternalAsyncIteratorImpl<>();
+this.iterator.registerUDF(
+iterator -> {
+TimestampedCollector outputCollector = new 
TimestampedCollector<>(output);
+try {
+function.mapPartition(() -> iterator, outputCollector);

Review Comment:
   I've set the operator to be `ChainingStrategy.NEVER`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-03-10 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825132#comment-17825132
 ] 

Hongshun Wang commented on FLINK-34634:
---

[~ruanhang1993] , [~renqs] , [~Leonard] , CC

> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> ---
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-09-15-25-26-187.png, 
> image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the 
> savepoint, but the job couldn't read the binlog anymore. When I checked the 
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta 
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is 
> aware of.
> However, the code should indeed remove redundant split information from the 
> Reader as seen in 
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does 
> this issue occur?
>  
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before 
> completing all the split meta information and then restarts, this issue 
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader 
> is 6, and no meta information has been synchronized, leaving the 
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
>  * Add Thread.sleep(1000L) in 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
>  to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
> LOG.debug(
> "Source reader {} receives binlog meta with group id {}.",
> subtaskId,
> ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
> try {
> Thread.sleep(1000L);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
>  * Add Thread.sleep(500L) in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
>  to trigger savepoint before meta infos synchronization finishes.
>  
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
> // ..ingore 
> waitForSinkSize("sink", fetchedDataList.size());
> Thread.sleep(500L);
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
> savepointDirectory);
> jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one 
> table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>  
>  * Add chunk-meta.group.size  =2 in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run 
> test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
>  the error log will occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519074254


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MapPartitionOperator.java:
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.ExceptionUtils;
+
+/**
+ * The {@link MapPartitionOperator} is used to process all records in each 
partition on non-keyed
+ * stream. Each partition contains all records of a subtask.
+ */
+@Internal
+public class MapPartitionOperator
+extends AbstractUdfStreamOperator>
+implements OneInputStreamOperator, BoundedOneInput {
+
+private final MapPartitionFunction function;
+
+private long lastWatermarkTimestamp = Long.MIN_VALUE;
+
+private InternalAsyncIterator iterator;
+
+public MapPartitionOperator(MapPartitionFunction function) {
+super(function);
+this.function = function;
+}
+
+@Override
+public void setup(

Review Comment:
   The `open` method is now invoked later, and the `setup` will initialize the 
components needed for the operator's execution. It is more appropriate within 
the open method. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519073380


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link InternalAsyncIterator}. */
+@Internal
+public class InternalAsyncIteratorImpl implements 
InternalAsyncIterator {
+
+/**
+ * Max number of caches.
+ *
+ * The constant defines the maximum number of caches that can be 
created. Its value is set to
+ * 100, which is considered sufficient for most parallel jobs. Each cache 
is a record and
+ * occupies a minimal amount of memory so the value is not excessively 
large.
+ */
+private static final int DEFAULT_MAX_CACHE_NUM = 100;
+
+/** The lock to ensure consistency between task main thread and udf 
executor. */
+private final Lock lock = new ReentrantLock();
+
+/** The condition of lock. */
+private final Condition condition = lock.newCondition();
+
+/** The task udf executor. */
+private final Executor udfExecutor =
+Executors.newFixedThreadPool(1, new 
ExecutorThreadFactory("TaskUDFExecutor"));
+
+/** The queue to store record caches. */
+@GuardedBy("lock")
+private final Queue recordCaches = new LinkedList<>();
+
+/** The flag to represent the finished state of udf. */
+@GuardedBy("lock")
+private boolean isUDFFinished = false;
+
+/** The flag to represent the closed state of this iterator. */
+@GuardedBy("lock")
+private boolean isClosed = false;
+
+@Override
+public boolean hasNext() {
+return supplyWithLock(
+() -> {
+if (recordCaches.size() > 0) {
+return true;
+} else if (isClosed) {
+return false;
+} else {
+waitToGetStatus();
+return hasNext();
+}
+});
+}
+
+@Override
+public IN next() {
+return supplyWithLock(
+() -> {
+IN record;
+if (recordCaches.size() > 0) {
+record = recordCaches.poll();
+if (!isClosed) {
+notifyStatus();
+}
+return record;
+}
+waitToGetStatus();
+if (recordCaches.size() == 0) {
+checkState(isClosed);
+return null;
+}
+return recordCaches.poll();
+});
+}
+
+@Override
+public void registerUDF(Consumer> udf) {
+checkState(!isClosed);

Review Comment:
   I've refactored the implementation of the iterator for MapPartitionFunction 
and  make the logic of the code clearer. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Draft PR [flink]

2024-03-10 Thread via GitHub


1996fanrui closed pull request #24393: Draft PR
URL: https://github.com/apache/flink/pull/24393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519071934


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalAsyncIteratorImpl.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The default implementation of {@link InternalAsyncIterator}. */
+@Internal
+public class InternalAsyncIteratorImpl implements 
InternalAsyncIterator {
+
+/**
+ * Max number of caches.
+ *
+ * The constant defines the maximum number of caches that can be 
created. Its value is set to
+ * 100, which is considered sufficient for most parallel jobs. Each cache 
is a record and
+ * occupies a minimal amount of memory so the value is not excessively 
large.
+ */
+private static final int DEFAULT_MAX_CACHE_NUM = 100;
+
+/** The lock to ensure consistency between task main thread and udf 
executor. */
+private final Lock lock = new ReentrantLock();
+
+/** The condition of lock. */
+private final Condition condition = lock.newCondition();
+
+/** The task udf executor. */
+private final Executor udfExecutor =
+Executors.newFixedThreadPool(1, new 
ExecutorThreadFactory("TaskUDFExecutor"));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34152] Add an option to scale memory when downscaling [flink-kubernetes-operator]

2024-03-10 Thread via GitHub


1996fanrui commented on PR #786:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/786#issuecomment-1987512556

   > @1996fanrui I'm merging but I'll take any comments from your side that you 
might have reviewing this later on.
   
   Thanks @mxm for the improvement and ping! Sorry for the late response here. 
I just finished my vacation, I will take a look these 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519071465


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java:
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.MapPartitionOperator;
+import org.apache.flink.streaming.api.operators.PartitionAggregateOperator;
+import org.apache.flink.streaming.api.operators.PartitionReduceOperator;
+import 
org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+
+import static 
org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperator.DEFAULT_SORTPARTITION_MANAGE_MEMORY_WEIGHT;
+
+/**
+ * {@link PartitionWindowedStream} represents a data stream that collects 
records of each partition
+ * into a separate full window. Each partition contains all records of a 
subtask. The window
+ * emission will be triggered at the end of input.
+ *
+ * @param  The type of the elements in this stream.
+ */
+@PublicEvolving
+public class PartitionWindowedStream {
+
+private final StreamExecutionEnvironment environment;
+
+private final DataStream input;
+
+public PartitionWindowedStream(StreamExecutionEnvironment environment, 
DataStream input) {
+this.environment = environment;
+this.input = input;
+}
+
+/**
+ * Process the records of the window by {@link MapPartitionFunction}.
+ *
+ * @param mapPartitionFunction The map partition function.
+ * @param  The type of map partition result.
+ * @return The data stream with map partition result.
+ */
+public  SingleOutputStreamOperator mapPartition(
+MapPartitionFunction mapPartitionFunction) {
+if (mapPartitionFunction == null) {
+throw new NullPointerException("The map partition function must 
not be null.");
+}

Review Comment:
   Good point. I've added `checkNotNull` in the similar positions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1519070714


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link AbstractSortPartitionOperator} is the base class of sort 
partition operator, which
+ * provides shared construction methods and utility functions.
+ *
+ * @param  The type of input record.
+ * @param  The type used to sort the records, which may be 
different from the INPUT_TYPE.
+ * For example, if the input record is sorted according to the selected 
key by {@link
+ * KeySelector}, the selected key should also be written to {@link 
ExternalSorter} with the
+ * input record to avid repeated key selections. In this case, the type 
used to sort the records
+ * will be a tuple containing both the selected key and record.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public abstract class AbstractSortPartitionOperator
+extends AbstractStreamOperator
+implements OneInputStreamOperator, 
BoundedOneInput {
+
+/** The default manage memory weight of sort partition operator. */
+public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128;

Review Comment:
   I've added two config options to config the manage memory weight for the 
SortPartition API on `KeyedPartitionWindowedStream` and 
`NonKeyedPartitionWindowedStream`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31663) Add ARRAY_EXCEPT supported in SQL & Table API

2024-03-10 Thread Jacky Lau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825131#comment-17825131
 ] 

Jacky Lau commented on FLINK-31663:
---

hi [~snuyanzin] what is your opinion?

> Add ARRAY_EXCEPT supported in SQL & Table API
> -
>
> Key: FLINK-31663
> URL: https://issues.apache.org/jira/browse/FLINK-31663
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34556][table] Migrate EnumerableToLogicalTableScan to java. [flink]

2024-03-10 Thread via GitHub


liuyongvs commented on PR #24421:
URL: https://github.com/apache/flink/pull/24421#issuecomment-1987508497

   hi @snuyanzin will you also help review this pr ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34503][table] Migrate JoinDeriveNullFilterRule to java. [flink]

2024-03-10 Thread via GitHub


liuyongvs commented on PR #24373:
URL: https://github.com/apache/flink/pull/24373#issuecomment-1987508136

   hi @snuyanzin will you also help review this pr ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]

2024-03-10 Thread via GitHub


liuyongvs commented on code in PR #24423:
URL: https://github.com/apache/flink/pull/24423#discussion_r1519069215


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link 
org.apache.calcite.rel.core.Intersect} with a distinct
+ * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link 
org.apache.calcite.rel.core.Join}.
+ *
+ * Only handle the case of input size 2.
+ */
+@Value.Enclosing
+public class ReplaceIntersectWithSemiJoinRule
+extends 
RelRule
 {
+
+public static final ReplaceIntersectWithSemiJoinRule INSTANCE =
+
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT
+.toRule();
+
+private 
ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) 
{
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+Intersect intersect = call.rel(0);
+return !intersect.all && intersect.getInputs().size() == 2;
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+Intersect intersect = call.rel(0);
+RelNode left = intersect.getInput(0);
+RelNode right = intersect.getInput(1);
+
+RelBuilder relBuilder = call.builder();
+List keys = Util.range(left.getRowType().getFieldCount());
+List conditions = generateEqualsCondition(relBuilder, left, 
right, keys);
+
+relBuilder.push(left);
+relBuilder.push(right);
+relBuilder
+.join(JoinRelType.SEMI, conditions)
+.aggregate(
+
relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray()));
+RelNode rel = relBuilder.build();
+call.transformTo(rel);
+}
+
+/** Rule configuration. */
+@Value.Immutable(singleton = false)
+public interface ReplaceIntersectWithSemiJoinRuleConfig extends 
RelRule.Config {
+
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT 
=
+
ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig
+.builder()
+.build()
+.withOperandSupplier(b0 -> 
b0.operand(Intersect.class).anyInputs())
+.withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+.withDescription("ReplaceIntersectWithSemiJoinRule");

Review Comment:
   left as is, others do it same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]

2024-03-10 Thread via GitHub


liuyongvs commented on code in PR #24423:
URL: https://github.com/apache/flink/pull/24423#discussion_r1519068996


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link 
org.apache.calcite.rel.core.Intersect} with a distinct
+ * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link 
org.apache.calcite.rel.core.Join}.

Review Comment:
   @snuyanzin fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1510732371


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link AbstractSortPartitionOperator} is the base class of sort 
partition operator, which
+ * provides shared construction methods and utility functions.
+ *
+ * @param  The type of input record.
+ * @param  The type used to sort the records, which may be 
different from the INPUT_TYPE.
+ * For example, if the input record is sorted according to the selected 
key by {@link
+ * KeySelector}, the selected key should also be written to {@link 
ExternalSorter} with the
+ * input record to avid repeated key selections. In this case, the type 
used to sort the records
+ * will be a tuple containing both the selected key and record.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public abstract class AbstractSortPartitionOperator
+extends AbstractStreamOperator
+implements OneInputStreamOperator, 
BoundedOneInput {
+
+/** The default manage memory weight of sort partition operator. */
+public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128;

Review Comment:
   1.I think adding a default value is an option. This value is only a weight. 
This implies that if other operators do not use Managed Memory, operators of 
the SortPartition API can utilize all of the Managed Memory, and the size of 
the Managed Memory can be adjusted through configuration. By this means, the 
budget for Managed Memory can also be indirectly adjusted.
   2.Adding a new configuration for a single API may introduce unnecessary 
complexity. 
   
   We can leave this issue here as a pending question. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34543][datastream]Support Full Partition Processing On Non-keyed DataStream [flink]

2024-03-10 Thread via GitHub


WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1510727272


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java:
##
@@ -1071,6 +1077,110 @@ protected SingleOutputStreamOperator 
aggregate(AggregationFunction aggrega
 return reduce(aggregate).name("Keyed Aggregation");
 }
 
+@Override
+public PartitionWindowedStream fullWindowPartition() {
+throw new UnsupportedOperationException(
+"KeyedStream doesn't support processing non-keyed 
partitions.");

Review Comment:
   After offline discussion, we all agree that a better version is "KeyedStream 
doesn't support full window on partitions, if you want operations over 
each-key, use methods in KeyedStream directly." In DataStream V1, the 
KeyedStream doesn't have the concept "all records of same key is a partition".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-03-10 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825123#comment-17825123
 ] 

Hangxiang Yu commented on FLINK-29114:
--

master: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58170=ms.vss-test-web.build-test-results-tab=4035576=115533=debug

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Major
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> 

Re: [PR] [FLINK-32076][checkpoint] Introduce file pool to reuse files [flink]

2024-03-10 Thread via GitHub


masteryhx commented on PR #24418:
URL: https://github.com/apache/flink/pull/24418#issuecomment-1987489513

   The failed case is not related to this pr: 
[FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34632][runtime/checkpointing] Log checkpoint Id when logging delay [flink]

2024-03-10 Thread via GitHub


masteryhx commented on code in PR #24474:
URL: https://github.com/apache/flink/pull/24474#discussion_r1519055013


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##
@@ -870,7 +870,8 @@ private static void 
logCheckpointProcessingDelay(CheckpointMetaData checkpointMe
 long delay = System.currentTimeMillis() - 
checkpointMetaData.getReceiveTimestamp();
 if (delay >= CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS) {
 LOG.warn(
-"Time from receiving all checkpoint barriers/RPC to 
executing it exceeded threshold: {}ms",
+"Time from receiving all checkpoint barriers/RPC for 
checkpoint id {} to executing it exceeded threshold: {}ms",

Review Comment:
   ```suggestion
   "Time from receiving all checkpoint barriers/RPC for 
checkpoint {} to executing it exceeded threshold: {}ms",
   ```
   Maybe just use 'checkpoint' as other log info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34585) [JUnit5 Migration] Module: Flink CDC

2024-03-10 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825121#comment-17825121
 ] 

LvYanquan commented on FLINK-34585:
---

I am willing to take this.

> [JUnit5 Migration] Module: Flink CDC
> 
>
> Key: FLINK-34585
> URL: https://issues.apache.org/jira/browse/FLINK-34585
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Hang Ruan
>Priority: Major
>
> Most tests in Flink CDC are still using Junit 4. We need to use Junit 5 
> instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-34637][table] Migrate JoinConditionEqualityTransferRule to java [flink]

2024-03-10 Thread via GitHub


flinkbot commented on PR #24477:
URL: https://github.com/apache/flink/pull/24477#issuecomment-1987441888

   
   ## CI report:
   
   * 8d7fa41547e16d601446501f8634f39784c85cea UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34637) Migrate JoinConditionEqualityTransferRule

2024-03-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34637:
---
Labels: pull-request-available  (was: )

> Migrate JoinConditionEqualityTransferRule
> -
>
> Key: FLINK-34637
> URL: https://issues.apache.org/jira/browse/FLINK-34637
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DRAFT][FLINK-34637][table] Migrate JoinConditionEqualityTransferRule to java [flink]

2024-03-10 Thread via GitHub


snuyanzin opened a new pull request, #24477:
URL: https://github.com/apache/flink/pull/24477

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34156) Move Flink Calcite rules from Scala to Java

2024-03-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825119#comment-17825119
 ] 

Sergey Nuyanzin edited comment on FLINK-34156 at 3/11/24 12:36 AM:
---

IMHO the more people the better for the community

may be however it would also make sense to double check with them what they are 
going to do, especially if they are new to Flink community


was (Author: sergey nuyanzin):
the more people the better for the community 

may be however it would also make sense to double check with them what they are 
going to do, especially if they are new to Flink community

> Move Flink Calcite rules from Scala to Java
> ---
>
> Key: FLINK-34156
> URL: https://issues.apache.org/jira/browse/FLINK-34156
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
> Fix For: 2.0.0
>
>
> This is an umbrella task for migration of Calcite rules from Scala to Java 
> mentioned at [https://cwiki.apache.org/confluence/display/FLINK/2.0+Release]
> The reason is that since 1.28.0 ( CALCITE-4787 - Move core to use Immutables 
> instead of ImmutableBeans ) Calcite started to use Immutables 
> ([https://immutables.github.io/]) and since 1.29.0 removed ImmutableBeans ( 
> CALCITE-4839 - Remove remnants of ImmutableBeans post 1.28 release ). All 
> rule configuration related api which is not Immutables based is marked as 
> deprecated. Since Immutables implies code generation while java compilation 
> it is seems impossible to use for rules in Scala code.
> We could follow steps from javadocs of {{org.apache.calcite.plan.RelRule}} 
> written for migration from deprecated java api to Immutables. 
> It would work for scala to java migration as well.
> Please keep in mind that there is +*no need*+ to migrate rules extending 
> +ConverterRule+ since these rules do not have such problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34156) Move Flink Calcite rules from Scala to Java

2024-03-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825119#comment-17825119
 ] 

Sergey Nuyanzin commented on FLINK-34156:
-

the more people the better for the community 

may be however it would also make sense to double check with them what they are 
going to do, especially if they are new to Flink community

> Move Flink Calcite rules from Scala to Java
> ---
>
> Key: FLINK-34156
> URL: https://issues.apache.org/jira/browse/FLINK-34156
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
> Fix For: 2.0.0
>
>
> This is an umbrella task for migration of Calcite rules from Scala to Java 
> mentioned at [https://cwiki.apache.org/confluence/display/FLINK/2.0+Release]
> The reason is that since 1.28.0 ( CALCITE-4787 - Move core to use Immutables 
> instead of ImmutableBeans ) Calcite started to use Immutables 
> ([https://immutables.github.io/]) and since 1.29.0 removed ImmutableBeans ( 
> CALCITE-4839 - Remove remnants of ImmutableBeans post 1.28 release ). All 
> rule configuration related api which is not Immutables based is marked as 
> deprecated. Since Immutables implies code generation while java compilation 
> it is seems impossible to use for rules in Scala code.
> We could follow steps from javadocs of {{org.apache.calcite.plan.RelRule}} 
> written for migration from deprecated java api to Immutables. 
> It would work for scala to java migration as well.
> Please keep in mind that there is +*no need*+ to migrate rules extending 
> +ConverterRule+ since these rules do not have such problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34637) Migrate JoinConditionEqualityTransferRule

2024-03-10 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34637:
---

 Summary: Migrate JoinConditionEqualityTransferRule
 Key: FLINK-34637
 URL: https://issues.apache.org/jira/browse/FLINK-34637
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]

2024-03-10 Thread via GitHub


snuyanzin commented on code in PR #24423:
URL: https://github.com/apache/flink/pull/24423#discussion_r1519025405


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link 
org.apache.calcite.rel.core.Intersect} with a distinct
+ * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link 
org.apache.calcite.rel.core.Join}.
+ *
+ * Only handle the case of input size 2.
+ */
+@Value.Enclosing
+public class ReplaceIntersectWithSemiJoinRule
+extends 
RelRule
 {
+
+public static final ReplaceIntersectWithSemiJoinRule INSTANCE =
+
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT
+.toRule();
+
+private 
ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) 
{
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+Intersect intersect = call.rel(0);
+return !intersect.all && intersect.getInputs().size() == 2;
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+Intersect intersect = call.rel(0);
+RelNode left = intersect.getInput(0);
+RelNode right = intersect.getInput(1);
+
+RelBuilder relBuilder = call.builder();
+List keys = Util.range(left.getRowType().getFieldCount());
+List conditions = generateEqualsCondition(relBuilder, left, 
right, keys);
+
+relBuilder.push(left);
+relBuilder.push(right);
+relBuilder
+.join(JoinRelType.SEMI, conditions)
+.aggregate(
+
relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray()));
+RelNode rel = relBuilder.build();
+call.transformTo(rel);
+}
+
+/** Rule configuration. */
+@Value.Immutable(singleton = false)
+public interface ReplaceIntersectWithSemiJoinRuleConfig extends 
RelRule.Config {
+
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT 
=
+
ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig
+.builder()
+.build()
+.withOperandSupplier(b0 -> 
b0.operand(Intersect.class).anyInputs())
+.withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+.withDescription("ReplaceIntersectWithSemiJoinRule");

Review Comment:
   very nit: in fact could be left as is
   however it also could be moved to builder and then the will not be created 
extra objects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34158][table] Migrate WindowAggregateReduceFunctionsRule to java [flink]

2024-03-10 Thread via GitHub


snuyanzin commented on code in PR #24140:
URL: https://github.com/apache/flink/pull/24140#discussion_r1519024028


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rule to convert complex aggregation functions into simpler ones. Have a 
look at
+ * [[AggregateReduceFunctionsRule]] for details.
+ */
+public class WindowAggregateReduceFunctionsRule extends 
AggregateReduceFunctionsRule {
+private static final RelBuilderFactory 
LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE =
+RelBuilder.proto(
+Contexts.of(
+RelFactories.DEFAULT_STRUCT,
+
RelBuilder.Config.DEFAULT.withPruneInputOfAggregate(false)));
+
+public static final WindowAggregateReduceFunctionsRule

Review Comment:
   No, unfortunately that's not possible or at least I don't know how to do it.
   
   The problem is that normally the rule should extend from `RelRule` where 
custom config class is a parameter. However here we extend from 
`AggregateReduceFunctionsRule` where the config is already defained inside 
`AggregateReduceFunctionsRule` and we can not change it since the class is 
inside Calcite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34587][table] Introduce MODE aggregate function [flink]

2024-03-10 Thread via GitHub


snuyanzin commented on code in PR #24443:
URL: https://github.com/apache/flink/pull/24443#discussion_r1519010699


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ModeAggFunctionTest.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.functions.aggregate.ModeAggFunction;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.StringData.fromString;
+
+/** Test case for built-in MODE with retraction aggregate function. */
+final class ModeAggFunctionTest
+extends AggFunctionTestBase> {
+
+@Override
+protected List> getInputValueSets() {
+return Arrays.asList(
+Arrays.asList(fromString("1"), fromString("1"), 
fromString("3")),
+Arrays.asList(fromString("4"), null, fromString("4")),
+Arrays.asList(null, null),
+Arrays.asList(

Review Comment:
   it seems there was an issue in test (merge/retract) which requires at least 
2 input elements otherwise fail with exception.
   I added a hotfix to skipthis test for inputs with amount of elements less 
than 2 in a similar way it is done for cases without `merge`, `retract` 
implementation
   
   After that I added this test (merge is skipped however others are passed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-03-10 Thread via GitHub


dchristle commented on PR #84:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/84#issuecomment-1987392406

   @minchowang I submitted a fix in 
https://github.com/apache/flink-connector-pulsar/pull/85 along with a unit test 
that reproduces the issue. Replacing `%` with `floorMod` is simpler and keeps 
the function deterministic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-03-10 Thread via GitHub


dchristle opened a new pull request, #85:
URL: https://github.com/apache/flink-connector-pulsar/pull/85

   
   
   ## Purpose of the change
   This PR fixes a bug in partition owner logic returning `-1` for the 
partition owner. The subtask ID to which we try to assign a split should always 
be within `[0, parallelism-1]`. The bug occurs because a non-partitioned topic 
has `partitionId = -1` and the hash logic for different topic names will yield 
`startIndex = 0`. Since the `%` operator can return negative values, the owner 
ID returned is `-1`. Replacing `%` with `floorMod` fixes this issue.
   
   ## Brief change log
   
   - Use `floorMod` instead of `%` so that `calculatePartitionOwner` cannot 
return a negative value.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Unit test to check that a split is assigned for different non-partitioned 
topic names and parallelism values. This test fails without the fix.
   
   
   ## Significant changes
   
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
   - If yes, how is this documented? (not applicable / docs / JavaDocs / 
not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java. [flink]

2024-03-10 Thread via GitHub


snuyanzin commented on code in PR #24423:
URL: https://github.com/apache/flink/pull/24423#discussion_r1518998032


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link 
org.apache.calcite.rel.core.Intersect} with a distinct
+ * {@link org.apache.calcite.rel.core.Aggregate} on a SEMI {@link 
org.apache.calcite.rel.core.Join}.

Review Comment:
   nit: is there a reason we use absolute name in javadocs?
   in scala it was relative why can't we follow same approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-10 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-1987359390

   Hi @zhuzhurk could you please review the PR in you available time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34601][table] Migrate of StreamPhysicalConstantTableFunctionScanRule to java [flink]

2024-03-10 Thread via GitHub


snuyanzin commented on PR #24449:
URL: https://github.com/apache/flink/pull/24449#issuecomment-1987334698

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34629] Fix non-partition topic subscribe lost. [flink-connector-pulsar]

2024-03-10 Thread via GitHub


dchristle commented on code in PR #84:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/84#discussion_r1518922858


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java:
##
@@ -125,11 +127,12 @@ protected int partitionOwner(TopicPartition partition) {
 
 @VisibleForTesting
 static int calculatePartitionOwner(String topic, int partitionId, int 
parallelism) {
-int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism;
+// int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % 
parallelism;
 /*
  * Here, the assumption is that the id of Pulsar partitions are always 
ascending starting from
  * 0. Therefore, can be used directly as the offset clockwise from the 
start index.
  */
-return (startIndex + partitionId) % parallelism;
+// return (startIndex + partitionId) % parallelism;
+return idCounter.getAndIncrement() % parallelism;

Review Comment:
   This change would make assignment no longer deterministic from the inputs, 
since getAndIncrement will increment `idCounter` each time it's called. The use 
of `hashCode` should already ensure the partitions are somewhat evenly 
distributed across subtasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34629) Pulsar source lost topic subscribe

2024-03-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34629:
---
Labels: pull-request-available  (was: )

> Pulsar source lost topic subscribe
> --
>
> Key: FLINK-34629
> URL: https://issues.apache.org/jira/browse/FLINK-34629
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-3.0.1
>Reporter: WangMinChao
>Priority: Major
>  Labels: pull-request-available
>
> The non-partition pulsar topic partition id is `-1`, using multiples of the 
> non-partition topics  
>  in Pulsar source maybe lose topic subscribe.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Modify BarrierAlignmentUtilTest comments [flink]

2024-03-10 Thread via GitHub


flinkbot commented on PR #24476:
URL: https://github.com/apache/flink/pull/24476#issuecomment-1987316215

   
   ## CI report:
   
   * 4c7831ad03b847349fba69fe1d45a35852b5fa6c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Modify BarrierAlignmentUtilTest comments [flink]

2024-03-10 Thread via GitHub


HCTommy commented on PR #24476:
URL: https://github.com/apache/flink/pull/24476#issuecomment-1987315148

   Corrected a comment of the [BarrierAlignmentUtilTest.java] file


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Modify BarrierAlignmentUtilTest comments [flink]

2024-03-10 Thread via GitHub


HCTommy opened a new pull request, #24476:
URL: https://github.com/apache/flink/pull/24476

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34524] Scale down JM deployment to 0 before deletion [flink-kubernetes-operator]

2024-03-10 Thread via GitHub


gyfora commented on PR #791:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/791#issuecomment-1987310999

   The new and improved / consistent logging after the rework @mateczagany @mxm 
:
   
   ```
   [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Info| CLEANUP  
   | Cleaning up FlinkDeployment
   [INFO ][default/basic-checkpoint-ha-example] Cleaning up autoscaling meta 
data
   [INFO ][default/basic-checkpoint-ha-example] Job is running, cancelling job.
   [INFO ][default/basic-checkpoint-ha-example] Job successfully cancelled.
   [INFO ][default/basic-checkpoint-ha-example] Deleting cluster with 
Foreground propagation
   [INFO ][default/basic-checkpoint-ha-example] Scaling JobManager Deployment 
to zero with 300 seconds timeout...
   [INFO ][default/basic-checkpoint-ha-example] Completed Scaling JobManager 
Deployment to zero
   [INFO ][default/basic-checkpoint-ha-example] Deleting JobManager Deployment 
with 298 seconds timeout...
   [INFO ][default/basic-checkpoint-ha-example] Completed Deleting JobManager 
Deployment
   [INFO ][default/basic-checkpoint-ha-example] Deleting Kubernetes HA metadata
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP][FLINK-32513][core] Add predecessor caching [flink]

2024-03-10 Thread via GitHub


flinkbot commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-1987262990

   
   ## CI report:
   
   * 89fdb18a4980e46c30683675025736b9ce5fd05d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32513) Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource()

2024-03-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32513:
---
Labels: pull-request-available  (was: )

> Job in BATCH mode with a significant number of transformations freezes on 
> method StreamGraphGenerator.existsUnboundedSource()
> -
>
> Key: FLINK-32513
> URL: https://issues.apache.org/jira/browse/FLINK-32513
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.3, 1.16.1, 1.17.1
> Environment: All modes (local, k8s session, k8s application, ...)
> Flink 1.15.3
> Flink 1.16.1
> Flink 1.17.1
>Reporter: Vladislav Keda
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-07-10-17-26-46-544.png
>
>
> Flink job executed in BATCH mode with a significant number of transformations 
> (more than 30 in my case) takes very long time to start due to the method 
> StreamGraphGenerator.existsUnboundedSource(). Also, during the execution of 
> the method, a lot of memory is consumed, which causes the GC to fire 
> frequently.
> Thread Dump:
> {code:java}
> "main@1" prio=5 tid=0x1 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at java.util.ArrayList.addAll(ArrayList.java:702)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:224)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.PartitionTransformation.getTransitivePredecessors(PartitionTransformation.java:95)
>       at 
> org.apache.flink.streaming.api.transformations.TwoInputTransformation.getTransitivePredecessors(TwoInputTransformation.java:223)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> org.apache.flink.streaming.api.transformations.OneInputTransformation.getTransitivePredecessors(OneInputTransformation.java:174)
>       at 
> 

[PR] [WIP][FLINK-32513][core] Add predecessor caching [flink]

2024-03-10 Thread via GitHub


jeyhunkarimov opened a new pull request, #24475:
URL: https://github.com/apache/flink/pull/24475

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34524] Scale down JM deployment to 0 before deletion [flink-kubernetes-operator]

2024-03-10 Thread via GitHub


gyfora commented on code in PR #791:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/791#discussion_r1518872217


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java:
##
@@ -356,4 +353,32 @@ public boolean scalingCompleted(FlinkResourceContext 
ctx) {
 throw new RuntimeException(e);
 }
 }
+
+private long scaleJmToZero(
+EditReplacePatchable jmDeployment,
+String namespace,
+String clusterId,
+long timeoutMillis) {
+LOG.info("Scaling down JobManager Deployment to zero before deletion");
+long startTime = System.currentTimeMillis();
+try {
+// We use patching instead of scaling to avoid the need for new 
permissions
+var patch = new 
DeploymentBuilder().editOrNewSpec().withReplicas(0).endSpec().build();
+jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), patch);
+kubernetesClient
+.pods()
+.inNamespace(namespace)
+
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
+.waitUntilCondition(
+Objects::isNull,
+
operatorConfig.getFlinkShutdownClusterTimeout().getSeconds(),

Review Comment:
   fixed, factored out the shared logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34334][state] Add sub-task level RocksDB file count metrics [flink]

2024-03-10 Thread via GitHub


hejufang commented on code in PR #24322:
URL: https://github.com/apache/flink/pull/24322#discussion_r1518860909


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java:
##
@@ -31,55 +31,86 @@
  */
 @Internal
 public enum RocksDBProperty {
-NumImmutableMemTable("num-immutable-mem-table"),
-MemTableFlushPending("mem-table-flush-pending"),
-CompactionPending("compaction-pending"),
-BackgroundErrors("background-errors"),
-CurSizeActiveMemTable("cur-size-active-mem-table"),
-CurSizeAllMemTables("cur-size-all-mem-tables"),
-SizeAllMemTables("size-all-mem-tables"),
-NumEntriesActiveMemTable("num-entries-active-mem-table"),
-NumEntriesImmMemTables("num-entries-imm-mem-tables"),
-NumDeletesActiveMemTable("num-deletes-active-mem-table"),
-NumDeletesImmMemTables("num-deletes-imm-mem-tables"),
-EstimateNumKeys("estimate-num-keys"),
-EstimateTableReadersMem("estimate-table-readers-mem"),
-NumSnapshots("num-snapshots"),
-NumLiveVersions("num-live-versions"),
-EstimateLiveDataSize("estimate-live-data-size"),
-TotalSstFilesSize("total-sst-files-size"),
-LiveSstFilesSize("live-sst-files-size"),
-EstimatePendingCompactionBytes("estimate-pending-compaction-bytes"),
-NumRunningCompactions("num-running-compactions"),
-NumRunningFlushes("num-running-flushes"),
-ActualDelayedWriteRate("actual-delayed-write-rate"),
-IsWriteStopped("is-write-stopped"),
-BlockCacheCapacity("block-cache-capacity"),
-BlockCacheUsage("block-cache-usage"),
-BlockCachePinnedUsage("block-cache-pinned-usage");
+NumImmutableMemTable("num-immutable-mem-table", "number"),
+MemTableFlushPending("mem-table-flush-pending", "number"),
+CompactionPending("compaction-pending", "number"),
+BackgroundErrors("background-errors", "number"),
+CurSizeActiveMemTable("cur-size-active-mem-table", "number"),
+CurSizeAllMemTables("cur-size-all-mem-tables", "number"),
+SizeAllMemTables("size-all-mem-tables", "number"),
+NumEntriesActiveMemTable("num-entries-active-mem-table", "number"),
+NumEntriesImmMemTables("num-entries-imm-mem-tables", "number"),
+NumDeletesActiveMemTable("num-deletes-active-mem-table", "number"),
+NumDeletesImmMemTables("num-deletes-imm-mem-tables", "number"),
+EstimateNumKeys("estimate-num-keys", "number"),
+EstimateTableReadersMem("estimate-table-readers-mem", "number"),
+NumSnapshots("num-snapshots", "number"),
+NumLiveVersions("num-live-versions", "number"),
+EstimateLiveDataSize("estimate-live-data-size", "number"),
+TotalSstFilesSize("total-sst-files-size", "number"),
+LiveSstFilesSize("live-sst-files-size", "number"),
+EstimatePendingCompactionBytes("estimate-pending-compaction-bytes", 
"number"),
+NumRunningCompactions("num-running-compactions", "number"),
+NumRunningFlushes("num-running-flushes", "number"),
+ActualDelayedWriteRate("actual-delayed-write-rate", "number"),
+IsWriteStopped("is-write-stopped", "number"),
+BlockCacheCapacity("block-cache-capacity", "number"),
+BlockCacheUsage("block-cache-usage", "number"),
+BlockCachePinnedUsage("block-cache-pinned-usage", "number"),
+NumFilesAtLevel("num-files-at-level", "string");
 
 private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s";
 
 private static final String CONFIG_KEY_FORMAT = 
"state.backend.rocksdb.metrics.%s";
 
-private final String property;
+private final String propertyName;
 
-RocksDBProperty(String property) {
-this.property = property;
+private final String type;

Review Comment:
   @Zakelly Thanks for your remind, I have changed it to enum class. Please 
review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32367) lead function second param cause ClassCastException

2024-03-10 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825031#comment-17825031
 ] 

Jeyhun Karimov commented on FLINK-32367:


Hi [~zhou_yb] when I tried to reproduce with the current master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the LEAD function successfully 
accepts and executes expression as parameter. For example, the following query 
works:
{code:java}
SELECT a, b, lead(b, a/2, 3) over (partition by a order by b), lag(b, 1, 3) 
over (partitionby a order by b) FROM Table6{code}
Could you please verify or am I missing sth?

> lead function second param cause ClassCastException
> ---
>
> Key: FLINK-32367
> URL: https://issues.apache.org/jira/browse/FLINK-32367
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-06-16-15-49-49-003.png, 
> image-2023-06-16-18-12-05-861.png
>
>
> !image-2023-06-16-18-12-05-861.png!!image-2023-06-16-15-49-49-003.png!
> lead function second param is expression (window_length/2),throw a exception 
> if lead function second param is number,it worked well



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33579) Join sql error

2024-03-10 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825027#comment-17825027
 ] 

Jeyhun Karimov commented on FLINK-33579:


Hi [~waywtdcc] when I tried to reproduce locally as of the current master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54), the above-mentioned exception seems 
to be gone. 

Could you please verify?

> Join sql error
> --
>
> Key: FLINK-33579
> URL: https://issues.apache.org/jira/browse/FLINK-33579
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.17.1
>Reporter: waywtdcc
>Priority: Major
>
>  
> {code:java}
> set pipeline.operator-chaining=true;
>  set execution.runtime-mode=BATCH;
>   set table.exec.disabled-operators = NestedLoopJoin;
> explain plan for
> select
> *
> from
> orders,
> supplier,
> customer
> where
> c_custkey = o_custkey and
> c_nationkey = s_nationkey {code}
>  
>  
>  
> error:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
>  
> FlinkLogicalJoin(condition=[AND(=($21, $2), =($24, $15))], joinType=[inner])
> :- FlinkLogicalJoin(condition=[true], joinType=[inner])
> :  :- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, orders]], 
> fields=[uuid, o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
> o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, ts])
> :  +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, 
> supplier]], fields=[uuid, s_suppkey, s_name, s_address, s_nationkey, s_phone, 
> s_acctbal, s_comment, ts])
> +- FlinkLogicalTableSourceScan(table=[[paimon, tpch100g_paimon, customer]], 
> fields=[uuid, c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, 
> c_mktsegment, c_comment, ts])
>  
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
>  
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70)
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:541)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:620)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentInternal.explainInternal(TableEnvironmentInternal.java:96)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1296)
> at 
> 

[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-03-10 Thread Vincent Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Woo updated FLINK-34636:

Description: 
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
  
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
  
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
  
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
at java.lang.Thread.run(Thread.java:748) {code}
!image-20240308101407396.png|width=866,height=171!

Network metric:Only this TM is always 100%, without any variation.

!image-20240308100308649.png|width=868,height=338!

The status of the task deployed to this TM cannot be RUNNING and the status 
change is slow

!image-20240308101008765.png|width=869,height=118!

Although the root exception thrown by the  application is 
PartitionNotFoundException, the actual underlying root cause exception log 
found is IOException: Timeout triggered when requesting exclusive buffers

!image-20240308101934756.png|width=869,height=394!

  was:
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 

[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-03-10 Thread Vincent Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Woo updated FLINK-34636:

Description: 
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
  
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
  
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
  
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
at java.lang.Thread.run(Thread.java:748) {code}
!image-20240308101407396.png|width=866,height=171!

Network metric:Only this TM is always 100%, without any variation.

!image-20240308100308649.png|width=688,height=268!

The status of the task deployed to this TM cannot be RUNNING and the status 
change is slow

!image-20240308101008765.png|width=567,height=77!

Although the root exception thrown by the  application is 
PartitionNotFoundException, the actual underlying root cause exception log 
found is IOException: Timeout triggered when requesting exclusive buffers

!image-20240308101934756.png|width=567,height=257!

  was:
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 

[jira] [Updated] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-03-10 Thread Vincent Woo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Woo updated FLINK-34636:

Description: 
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
  
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
  
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
  
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
at java.lang.Thread.run(Thread.java:748) {code}
!image-20240308101407396.png|width=577,height=114!

Network metric:Only this TM is always 100%, without any variation.

!image-20240308100308649.png|width=570,height=222!

The status of the task deployed to this TM cannot be RUNNING and the status 
change is slow

!image-20240308101008765.png|width=567,height=77!

Although the root exception thrown by the  application is 
PartitionNotFoundException, the actual underlying root cause exception log 
found is IOException: Timeout triggered when requesting exclusive buffers

!image-20240308101934756.png|width=567,height=257!

  was:
Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 

[jira] [Created] (FLINK-34636) Requesting exclusive buffers timeout causes repeated restarts and cannot be automatically recovered

2024-03-10 Thread Vincent Woo (Jira)
Vincent Woo created FLINK-34636:
---

 Summary: Requesting exclusive buffers timeout causes repeated 
restarts and cannot be automatically recovered
 Key: FLINK-34636
 URL: https://issues.apache.org/jira/browse/FLINK-34636
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Vincent Woo
 Attachments: image-20240308100308649.png, image-20240308101008765.png, 
image-20240308101407396.png, image-20240308101934756.png

Based on the observation of logs and metrics, it was found that a subtask 
deployed on a same TM consistently reported an exception of requesting 
exclusive buffers timeout. It was discovered that during the restart process, 
【{*}Network{*}】 metric remained unchanged (heap memory usage did change). I 
suspect that the network buffer memory was not properly released during the 
restart process, which caused the newly deployed task to fail to obtain the 
network buffer. This problem persisted despite repeated restarts, and the 
application failed to recover automatically.

(I'm not sure if there are other reasons for this issue)

Attached below are screenshots of the exception stack and relevant metrics:
{code:java}
2024-03-08 09:58:18,738 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - GroupWindowAggregate switched from DEPLOYING to FAILED with 
failure cause: java.io.IOException: Timeout triggered when requesting exclusive 
buffers: The total number of network buffers is currently set to 32768 of 32768 
bytes each. You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:246)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:169)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:247)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:427)
  
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:257)
  
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:84)
  
at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:952)
  
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)  
at java.lang.Thread.run(Thread.java:748) {code}
!image-20240308101407396.png!

Network metric:Only this TM is always 100%, without any variation.

!image-20240308100308649.png|width=2540,height=989!

The status of the task deployed to this TM cannot be RUNNING and the status 
change is slow

!image-20240308101008765.png!

Although the root exception thrown by the  application is 
PartitionNotFoundException, the actual underlying root cause exception log 
found is IOException: Timeout triggered when requesting exclusive buffers

!image-20240308101934756.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)