Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-22 Thread via GitHub


zhuzhurk closed pull request #24475: [FLINK-32513][core] Add predecessor caching
URL: https://github.com/apache/flink/pull/24475


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-22 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-2014868333

   > Looks good to me. Thanks for addressing all the comments! @jeyhunkarimov 
Would you squash the last two commits and rebase the changes onto the latest 
Flink master branch?
   
   Thanks a lot @zhuzhurk for your reviews. Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-21 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1533318682


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/GetTransitivePredecessorsTest.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@code getTransitivePredecessors} method of {@link 
Transformation}. */
+class GetTransitivePredecessorsTest extends TestLogger {

Review Comment:
   ` extends TestLogger` is no longer needed since JUnit5. The logging will be 
enabled automatically.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/GetTransitivePredecessorsTest.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@code getTransitivePredecessors} method of {@link 
Transformation}. */
+class GetTransitivePredecessorsTest extends TestLogger {
+
+private Transformation commonNode;
+private Transformation midNode;
+
+@BeforeEach
+void setup() {
+commonNode = new TestTransformation<>("commonNode", new 
MockIntegerTypeInfo(), 1);
+midNode =
+new OneInputTransformation<>(
+commonNode,
+"midNode",
+new DummyOneInputOperator(),
+new MockIntegerTypeInfo(),
+1);
+}
+
+@Test
+void testTwoInputTransformation() {
+Transformation topNode =
+new TwoInputTransformation<>(
+commonNode,
+midNode,
+"topNode",
+new DummyTwoInputOperator<>(),
+midNode.getOutputType(),
+1);
+List> predecessors = 

Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-16 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-2002153186

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-16 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-2002135965

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-16 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-2002128082

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-14 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1524777467


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java:
##
@@ -76,10 +78,10 @@ public StreamOperatorFactory getOperatorFactory() {
 }
 
 @Override
-public List> getTransitivePredecessors() {

Review Comment:
   Well. I just noticed a potential bug that the transformation itself is not 
added to the predecessors, unlike all other transformations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-14 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1524746682


##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -125,19 +145,34 @@ public void 
testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() {
 
 /** A test implementation of {@link Transformation}. */
 private static class TestTransformation extends Transformation {
-
-public TestTransformation(String name, TypeInformation outputType, 
int parallelism) {

Review Comment:
   It's better to introduce a `TestTwoInputTransformation` which inherits 
`TwoInputTransformation`.
   Otherwise we are testing some testing logic instead of production logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-14 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1524742606


##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -42,10 +43,21 @@
 public class TransformationTest extends TestLogger {
 
 private Transformation transformation;
+private Transformation transformationWithInput;
 
 @Before
 public void setUp() {
 transformation = new TestTransformation<>("t", null, 1);
+transformationWithInput =
+new TestTransformationWithInput<>(
+"t", null, 1, Arrays.asList(transformation, 
transformation));
+}
+
+@Test
+public void testPredecessorCache() throws Exception {
+transformationWithInput.getTransitivePredecessors();
+assertEquals(transformationWithInput.getPredecessorsCache().size(), 1);
+assertEquals(((TestTransformation) 
transformation).getNumGetTransitivePredecessor(), 1);
 }

Review Comment:
   Can we verify the result of `topNode.getTransitivePredecessors()`?
   Ideally it should be 3. But without the fix it will be 4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-14 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1524746682


##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -125,19 +145,34 @@ public void 
testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() {
 
 /** A test implementation of {@link Transformation}. */
 private static class TestTransformation extends Transformation {
-
-public TestTransformation(String name, TypeInformation outputType, 
int parallelism) {

Review Comment:
   It's better to introduce a `TestMultipleInputTransformation` which inherits 
`AbstractMultipleInputTransformation`.
   Otherwise we are testing some testing logic instead of production logic.



##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -45,7 +47,25 @@ public class TransformationTest extends TestLogger {
 
 @Before
 public void setUp() {
-transformation = new TestTransformation<>("t", null, 1);
+transformation = new TestTransformation<>("t", null, 1, null);
+}
+
+@Test
+public void testPredecessorCache() throws Exception {
+// diamond-like transformation:
+//   --> midNode-\
+//  ---/ --\
+//   --/->
+//  commonNode  -> topNode
+Transformation commonNode = new 
TestTransformation<>("commonNode", null, 1, null);
+Transformation midNode =
+new TestTransformation<>("midNode", null, 1, 
Collections.singletonList(commonNode));
+Transformation topNode =
+new TestTransformation<>("topNode", null, 1, 
Arrays.asList(midNode, commonNode));
+
+topNode.getTransitivePredecessors();
+assertEquals(topNode.getPredecessorsCache().size(), 1);

Review Comment:
   Flink is turning to JUnit5. According to the community guide[1], new tests 
should be written using JUnit5. If we have to modify an existing testing class, 
we should migrate it to JUnit5 first in a hotfix commit, for example, 
[#20350](https://github.com/apache/flink/pull/20350/commits).
   
   [1] 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#tooling



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractBroadcastStateTransformation.java:
##
@@ -103,7 +103,7 @@ public void setChainingStrategy(ChainingStrategy 
chainingStrategy) {
 }
 
 @Override
-public List> getTransitivePredecessors() {
+protected List> getTransitivePredecessorsInternal() {

Review Comment:
   This method, and a few other methods like in TwoInputTransformation and 
UnionTransformation, need to be fixed too. 
   And it's better to introduce a test for each of them. Maybe we can introduce 
a separate test class `GetTransitivePredecessorsTest` to host them all.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java:
##
@@ -76,10 +78,10 @@ public StreamOperatorFactory getOperatorFactory() {
 }
 
 @Override
-public List> getTransitivePredecessors() {

Review Comment:
   A simpler way can be
   
   ```
   return inputs.stream()
   .flatMap(input -> input.getTransitivePredecessors().stream())
   .distinct();
   .collect(Collectors.toList());
   ```



##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -42,10 +43,21 @@
 public class TransformationTest extends TestLogger {
 
 private Transformation transformation;
+private Transformation transformationWithInput;
 
 @Before
 public void setUp() {
 transformation = new TestTransformation<>("t", null, 1);
+transformationWithInput =
+new TestTransformationWithInput<>(
+"t", null, 1, Arrays.asList(transformation, 
transformation));
+}
+
+@Test
+public void testPredecessorCache() throws Exception {
+transformationWithInput.getTransitivePredecessors();
+assertEquals(transformationWithInput.getPredecessorsCache().size(), 1);
+assertEquals(((TestTransformation) 
transformation).getNumGetTransitivePredecessor(), 1);
 }

Review Comment:
   Can we verify the result of `topNode.getTransitivePredecessors()`?
   Ideally it should be 2. But without the fix it will be 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-12 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-1990782761

   Thanks @zhuzhurk for the review. I addressed them. Could you please check 
again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-11 Thread via GitHub


zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1519708804


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java:
##
@@ -76,7 +76,7 @@ public StreamOperatorFactory getOperatorFactory() {
 }
 
 @Override
-public List> getTransitivePredecessors() {
+protected List> getTransitivePredecessorsInternal() {
 return inputs.stream()
 .flatMap(input -> input.getTransitivePredecessors().stream())

Review Comment:
   One transformation, and all its predecessors, can appear multiple times if 
it is the head of a diamond DAG structure, like the case reported in 
FLINK-32513. This may results in lots of memory consumption.
   Maybe we can introduce an `LinkedHashSet` to do deduplication.



##
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##
@@ -42,10 +43,21 @@
 public class TransformationTest extends TestLogger {
 
 private Transformation transformation;
+private Transformation transformationWithInput;
 
 @Before
 public void setUp() {
 transformation = new TestTransformation<>("t", null, 1);
+transformationWithInput =
+new TestTransformationWithInput<>(
+"t", null, 1, Arrays.asList(transformation, 
transformation));
+}
+
+@Test
+public void testPredecessorCache() throws Exception {
+transformationWithInput.getTransitivePredecessors();
+assertEquals(transformationWithInput.getPredecessorsCache().size(), 1);
+assertEquals(((TestTransformation) 
transformation).getNumGetTransitivePredecessor(), 1);
 }

Review Comment:
   Could you construct a case that is demonstrated in FLINK-32513 and verify 
the traversing count and predecessor count? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]

2024-03-10 Thread via GitHub


jeyhunkarimov commented on PR #24475:
URL: https://github.com/apache/flink/pull/24475#issuecomment-1987359390

   Hi @zhuzhurk could you please review the PR in you available time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org