Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk closed pull request #24475: [FLINK-32513][core] Add predecessor caching URL: https://github.com/apache/flink/pull/24475 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-2014868333 > Looks good to me. Thanks for addressing all the comments! @jeyhunkarimov Would you squash the last two commits and rebase the changes onto the latest Flink master branch? Thanks a lot @zhuzhurk for your reviews. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1533318682 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/GetTransitivePredecessorsTest.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@code getTransitivePredecessors} method of {@link Transformation}. */ +class GetTransitivePredecessorsTest extends TestLogger { Review Comment: ` extends TestLogger` is no longer needed since JUnit5. The logging will be enabled automatically. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/GetTransitivePredecessorsTest.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@code getTransitivePredecessors} method of {@link Transformation}. */ +class GetTransitivePredecessorsTest extends TestLogger { + +private Transformation commonNode; +private Transformation midNode; + +@BeforeEach +void setup() { +commonNode = new TestTransformation<>("commonNode", new MockIntegerTypeInfo(), 1); +midNode = +new OneInputTransformation<>( +commonNode, +"midNode", +new DummyOneInputOperator(), +new MockIntegerTypeInfo(), +1); +} + +@Test +void testTwoInputTransformation() { +Transformation topNode = +new TwoInputTransformation<>( +commonNode, +midNode, +"topNode", +new DummyTwoInputOperator<>(), +midNode.getOutputType(), +1); +List> predecessors =
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-2002153186 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-2002135965 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-2002128082 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1524777467 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java: ## @@ -76,10 +78,10 @@ public StreamOperatorFactory getOperatorFactory() { } @Override -public List> getTransitivePredecessors() { Review Comment: Well. I just noticed a potential bug that the transformation itself is not added to the predecessors, unlike all other transformations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1524746682 ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -125,19 +145,34 @@ public void testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() { /** A test implementation of {@link Transformation}. */ private static class TestTransformation extends Transformation { - -public TestTransformation(String name, TypeInformation outputType, int parallelism) { Review Comment: It's better to introduce a `TestTwoInputTransformation` which inherits `TwoInputTransformation`. Otherwise we are testing some testing logic instead of production logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1524742606 ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -42,10 +43,21 @@ public class TransformationTest extends TestLogger { private Transformation transformation; +private Transformation transformationWithInput; @Before public void setUp() { transformation = new TestTransformation<>("t", null, 1); +transformationWithInput = +new TestTransformationWithInput<>( +"t", null, 1, Arrays.asList(transformation, transformation)); +} + +@Test +public void testPredecessorCache() throws Exception { +transformationWithInput.getTransitivePredecessors(); +assertEquals(transformationWithInput.getPredecessorsCache().size(), 1); +assertEquals(((TestTransformation) transformation).getNumGetTransitivePredecessor(), 1); } Review Comment: Can we verify the result of `topNode.getTransitivePredecessors()`? Ideally it should be 3. But without the fix it will be 4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1524746682 ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -125,19 +145,34 @@ public void testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() { /** A test implementation of {@link Transformation}. */ private static class TestTransformation extends Transformation { - -public TestTransformation(String name, TypeInformation outputType, int parallelism) { Review Comment: It's better to introduce a `TestMultipleInputTransformation` which inherits `AbstractMultipleInputTransformation`. Otherwise we are testing some testing logic instead of production logic. ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -45,7 +47,25 @@ public class TransformationTest extends TestLogger { @Before public void setUp() { -transformation = new TestTransformation<>("t", null, 1); +transformation = new TestTransformation<>("t", null, 1, null); +} + +@Test +public void testPredecessorCache() throws Exception { +// diamond-like transformation: +// --> midNode-\ +// ---/ --\ +// --/-> +// commonNode -> topNode +Transformation commonNode = new TestTransformation<>("commonNode", null, 1, null); +Transformation midNode = +new TestTransformation<>("midNode", null, 1, Collections.singletonList(commonNode)); +Transformation topNode = +new TestTransformation<>("topNode", null, 1, Arrays.asList(midNode, commonNode)); + +topNode.getTransitivePredecessors(); +assertEquals(topNode.getPredecessorsCache().size(), 1); Review Comment: Flink is turning to JUnit5. According to the community guide[1], new tests should be written using JUnit5. If we have to modify an existing testing class, we should migrate it to JUnit5 first in a hotfix commit, for example, [#20350](https://github.com/apache/flink/pull/20350/commits). [1] https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#tooling ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractBroadcastStateTransformation.java: ## @@ -103,7 +103,7 @@ public void setChainingStrategy(ChainingStrategy chainingStrategy) { } @Override -public List> getTransitivePredecessors() { +protected List> getTransitivePredecessorsInternal() { Review Comment: This method, and a few other methods like in TwoInputTransformation and UnionTransformation, need to be fixed too. And it's better to introduce a test for each of them. Maybe we can introduce a separate test class `GetTransitivePredecessorsTest` to host them all. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java: ## @@ -76,10 +78,10 @@ public StreamOperatorFactory getOperatorFactory() { } @Override -public List> getTransitivePredecessors() { Review Comment: A simpler way can be ``` return inputs.stream() .flatMap(input -> input.getTransitivePredecessors().stream()) .distinct(); .collect(Collectors.toList()); ``` ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -42,10 +43,21 @@ public class TransformationTest extends TestLogger { private Transformation transformation; +private Transformation transformationWithInput; @Before public void setUp() { transformation = new TestTransformation<>("t", null, 1); +transformationWithInput = +new TestTransformationWithInput<>( +"t", null, 1, Arrays.asList(transformation, transformation)); +} + +@Test +public void testPredecessorCache() throws Exception { +transformationWithInput.getTransitivePredecessors(); +assertEquals(transformationWithInput.getPredecessorsCache().size(), 1); +assertEquals(((TestTransformation) transformation).getNumGetTransitivePredecessor(), 1); } Review Comment: Can we verify the result of `topNode.getTransitivePredecessors()`? Ideally it should be 2. But without the fix it will be 3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-1990782761 Thanks @zhuzhurk for the review. I addressed them. Could you please check again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
zhuzhurk commented on code in PR #24475: URL: https://github.com/apache/flink/pull/24475#discussion_r1519708804 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java: ## @@ -76,7 +76,7 @@ public StreamOperatorFactory getOperatorFactory() { } @Override -public List> getTransitivePredecessors() { +protected List> getTransitivePredecessorsInternal() { return inputs.stream() .flatMap(input -> input.getTransitivePredecessors().stream()) Review Comment: One transformation, and all its predecessors, can appear multiple times if it is the head of a diamond DAG structure, like the case reported in FLINK-32513. This may results in lots of memory consumption. Maybe we can introduce an `LinkedHashSet` to do deduplication. ## flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java: ## @@ -42,10 +43,21 @@ public class TransformationTest extends TestLogger { private Transformation transformation; +private Transformation transformationWithInput; @Before public void setUp() { transformation = new TestTransformation<>("t", null, 1); +transformationWithInput = +new TestTransformationWithInput<>( +"t", null, 1, Arrays.asList(transformation, transformation)); +} + +@Test +public void testPredecessorCache() throws Exception { +transformationWithInput.getTransitivePredecessors(); +assertEquals(transformationWithInput.getPredecessorsCache().size(), 1); +assertEquals(((TestTransformation) transformation).getNumGetTransitivePredecessor(), 1); } Review Comment: Could you construct a case that is demonstrated in FLINK-32513 and verify the traversing count and predecessor count? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32513][core] Add predecessor caching [flink]
jeyhunkarimov commented on PR #24475: URL: https://github.com/apache/flink/pull/24475#issuecomment-1987359390 Hi @zhuzhurk could you please review the PR in you available time? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org