Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
xintongsong commented on PR #24272: URL: https://github.com/apache/flink/pull/24272#issuecomment-1968909988 @mohitjain2504 It's merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
mohitjain2504 commented on PR #24272: URL: https://github.com/apache/flink/pull/24272#issuecomment-1968895813 What is the reason for closing? Did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
xintongsong closed pull request #24272: [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment URL: https://github.com/apache/flink/pull/24272 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1502520887 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java: ## @@ -61,6 +61,8 @@ public enum CheckpointFailureReason { CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."), +CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."), Review Comment: The message had been used to resolve the following race condition: - A test case attempts to trigger stop-with-savepoint on a Flink job, during which CheckpointCoordinator#stopCheckpointScheduler() would be invoked before a savepoint is created. - The behavior introduced in this PR would check and invoke startCheckpointScheduler() if the running tasks contain no blocking edge and the scheduler is not started yet, and the status of stop-with-savepoint also meets this condition. - startCheckpointScheduler() would invoke stopCheckpointScheduler() first, which would cancel all ongoing and pending checkpoint/savepoints. The savepoint triggered by the test case is aborted because of this, and then the test case fails accordingly. As a temporary solution, I have required startCheckpointScheduler() not to invoke stopCheckpointScheduler() if the scheduler is not started yet, which helps to spare savepoints from being canceled in this process. Given that the race condition above would not occur once all tasks enter the running status, this solution should be acceptable for now. In the long run, I created FLINK-34519 to record the problem and basic ideas to facilitate the checkpointing mechanism for batch/streaming unification optimizations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1502520887 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java: ## @@ -61,6 +61,8 @@ public enum CheckpointFailureReason { CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."), +CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."), Review Comment: I have added in this PR a detailed failure reason denoting cases when a savepoint fails due to the existence of blocking edge, which should be enough for now. In the long run, I created FLINK-34519 to record the problem and basic ideas to facilitate the checkpointing mechanism for batch/streaming unification optimizations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1502324525 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java: ## @@ -76,4 +76,8 @@ public void setSupportsConcurrentExecutionAttempts( boolean supportsConcurrentExecutionAttempts) { this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; } + +public boolean isOutputOnlyAfterEndOfStream() { +return false; Review Comment: I just realized that this method is only used in xxxInputTransformationTranslator, and in all these usages the invoker methods know the concrete subclass of the physical transformation. Thus I removed this method and only kept the implementations in subclasses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
xintongsong commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1501806777 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** The builder class for {@link OperatorAttributes}. */ +@Experimental +public class OperatorAttributesBuilder { + +private static final Logger LOG = LoggerFactory.getLogger(OperatorAttributesBuilder.class); +@Nullable private Boolean outputOnlyAfterEndOfStream = null; Review Comment: Agree with @mohitjain2504 's opinion. In this particular case, I don't think it's necessary to distinguish the two cases. Might be nicer to keep it simple. ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java: ## @@ -31,19 +35,27 @@ public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; +private final Map tasks; -public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { +public CheckpointCoordinatorDeActivator( +CheckpointCoordinator coordinator, Map tasks) { this.coordinator = checkNotNull(coordinator); +this.tasks = checkNotNull(tasks); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { -if (newJobStatus == JobStatus.RUNNING) { -// start the checkpoint scheduler +if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) { +// start the checkpoint scheduler if there is no blocking edge coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } + +private boolean allTasksOutputNonBlocking() { +return tasks.values().stream() +.noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking()); Review Comment: The value of this never changes. It should be calculated at initialization, rather than every time `jobStatusChanges` is called. ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java: ## @@ -61,6 +61,8 @@ public enum CheckpointFailureReason { CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."), +CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."), Review Comment: The message is confusing. 1. According to how it is used, it's not about starting the checkpoint coordinator, but about canceling potential prior timers before schedule a new one. 2. I don't really get how could this happen? Is this comes from first stop the checkpoint scheduler and then start it again? Then shouldn't the timers be already canceled at stopping the checkpoint scheduler? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java: ## @@ -76,4 +76,8 @@ public void setSupportsConcurrentExecutionAttempts( boolean supportsConcurrentExecutionAttempts) { this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; } + +public boolean isOutputOnlyAfterEndOfStream() { +return false; Review Comment: Let's add a comment indicating that this is just a default value, and is not by definition of PhysicalTransformation. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java: ## @@ -75,7 +75,7 @@ public class StreamNode { private KeySelector[] statePartitioners = new KeySelector[0]; private TypeSerializer stateKeySerializer; -private StreamOperatorFactory operatorFactory; +private @Nullable StreamOperatorFactory operatorFactory; Review Comment: Why
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1497209330 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -2303,6 +2307,102 @@ void testOutputFormatSupportConcurrentExecutionAttempts() { new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); } +@Test +void testOutputOnlyAfterEndOfStream() { +final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + +final DataStream source = env.fromData(1, 2, 3).name("source"); +source.keyBy(x -> x) +.transform( +"map", +Types.INT, +new StreamOperatorWithConfigurableOperatorAttributes<>( +x -> x, +new OperatorAttributesBuilder() +.setOutputOnlyAfterEndOfStream(true) +.build())) +.sinkTo(new DiscardingSink<>()) +.name("sink"); + +final StreamGraph streamGraph = env.getStreamGraph(false); +Map nodeMap = new HashMap<>(); +for (StreamNode node : streamGraph.getStreamNodes()) { +nodeMap.put(node.getOperatorName(), node); +} +assertThat(nodeMap).hasSize(3); +assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); +assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue(); +assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + +assertThat(nodeMap.get("Source: source").getManagedMemoryOperatorScopeUseCaseWeights()) +.isEmpty(); + assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1); +assertThat(nodeMap.get("sink: Writer").getManagedMemoryOperatorScopeUseCaseWeights()) +.isEmpty(); + +JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); +Map vertexMap = new HashMap<>(); +for (JobVertex vertex : jobGraph.getVertices()) { +vertexMap.put(vertex.getName(), vertex); +} +assertThat(vertexMap).hasSize(2); +assertHasOutputPartitionType( +vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED); Review Comment: Tests have been added right after this line, disabling operator chaining and checking that the output edge of StreamOperatorWithConfigurableOperatorAttributes is blocking. Besides, there will be only one output edge(to be exact, IntermediateDataSet) no matter how many downstream operators there are, so this test case should be enough to verify that "all" downstream edges are blocking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1497166896 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** The builder class for {@link OperatorAttributes}. */ +@Experimental +public class OperatorAttributesBuilder { + +private static final Logger LOG = LoggerFactory.getLogger(OperatorAttributesBuilder.class); +@Nullable private Boolean outputOnlyAfterEndOfStream = null; Review Comment: The current implementation helps to distinguish cases when operator developers did not set an operator attribute and cases when they explicitly set the attribute value. This could be helpful for debugging purposes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
WencongLiu commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1497054471 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java: ## @@ -41,33 +41,45 @@ public final class OneInputTransformationTranslator @Override public Collection translateForBatchInternal( final OneInputTransformation transformation, final Context context) { -KeySelector keySelector = transformation.getStateKeySelector(); Collection ids = translateInternal( transformation, transformation.getOperatorFactory(), transformation.getInputType(), -keySelector, +transformation.getStateKeySelector(), transformation.getStateKeyType(), context); -boolean isKeyed = keySelector != null; -if (isKeyed) { -BatchExecutionUtils.applyBatchExecutionSettings( -transformation.getId(), context, StreamConfig.InputRequirement.SORTED); -} + +maybeApplyBatchExecutionSettings(transformation, context); return ids; } @Override public Collection translateForStreamingInternal( final OneInputTransformation transformation, final Context context) { -return translateInternal( -transformation, -transformation.getOperatorFactory(), -transformation.getInputType(), -transformation.getStateKeySelector(), -transformation.getStateKeyType(), -context); +Collection ids = +translateInternal( +transformation, +transformation.getOperatorFactory(), +transformation.getInputType(), +transformation.getStateKeySelector(), +transformation.getStateKeyType(), +context); + +if (transformation.isOutputOnlyAfterEndOfStream()) { Review Comment: There may be some operators defined on keyed stream that sort records internally. Maybe we should add another attribute to identify that case. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -2303,6 +2307,102 @@ void testOutputFormatSupportConcurrentExecutionAttempts() { new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); } +@Test +void testOutputOnlyAfterEndOfStream() { +final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + +final DataStream source = env.fromData(1, 2, 3).name("source"); +source.keyBy(x -> x) +.transform( +"map", +Types.INT, +new StreamOperatorWithConfigurableOperatorAttributes<>( +x -> x, +new OperatorAttributesBuilder() +.setOutputOnlyAfterEndOfStream(true) +.build())) +.sinkTo(new DiscardingSink<>()) +.name("sink"); + +final StreamGraph streamGraph = env.getStreamGraph(false); +Map nodeMap = new HashMap<>(); +for (StreamNode node : streamGraph.getStreamNodes()) { +nodeMap.put(node.getOperatorName(), node); +} +assertThat(nodeMap).hasSize(3); +assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); +assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue(); +assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + +assertThat(nodeMap.get("Source: source").getManagedMemoryOperatorScopeUseCaseWeights()) +.isEmpty(); + assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1); +assertThat(nodeMap.get("sink: Writer").getManagedMemoryOperatorScopeUseCaseWeights()) +.isEmpty(); + +JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); +Map vertexMap = new HashMap<>(); +for (JobVertex vertex : jobGraph.getVertices()) { +vertexMap.put(vertex.getName(), vertex); +} +assertThat(vertexMap).hasSize(2); +assertHasOutputPartitionType( +vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED); Review Comment: I think we should confirm that all edges connected from downstream nodes of `StreamOperatorWithConfigurableOperatorAttributes` are `BLOCKING`. In this test case,
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
mohitjain2504 commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1496989288 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** The builder class for {@link OperatorAttributes}. */ +@Experimental +public class OperatorAttributesBuilder { + +private static final Logger LOG = LoggerFactory.getLogger(OperatorAttributesBuilder.class); +@Nullable private Boolean outputOnlyAfterEndOfStream = null; Review Comment: Do we want to make this value false by default and remove the `@Nullable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
Sxnan commented on PR #24272: URL: https://github.com/apache/flink/pull/24272#issuecomment-1955959322 @yunfengzhou-hub Thanks for the update. LGTM! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub commented on PR #24272: URL: https://github.com/apache/flink/pull/24272#issuecomment-1955843349 Thanks for the comments @Sxnan @mohitjain2504 . I have updated the PR according to the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
Sxnan commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1495348233 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ## @@ -780,6 +780,9 @@ private void onTaskExecutionStateUpdate( // only notifies FINISHED and FAILED states which are needed at the moment. // can be refined in FLINK-14233 after the actions are factored out from ExecutionGraph. switch (taskExecutionState.getExecutionState()) { +case RUNNING: Review Comment: We should update the comment above. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -2333,6 +2334,12 @@ void testOutputOnlyAfterEndOfStream() { assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue(); assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); +assertThat(nodeMap.get("Source: source").getManagedMemoryOperatorScopeUseCaseWeights()) Review Comment: We may want to test `TwoInputTransformation` and `AbstractInputTransformation` as well. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java: ## @@ -516,7 +516,7 @@ public void failJobDueToTaskFailure( if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) - registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); + registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator(tasks)); Review Comment: We should update the comment above so that it is accurate after the change. ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -2309,6 +2309,7 @@ void testOutputFormatSupportConcurrentExecutionAttempts() { void testOutputOnlyAfterEndOfStream() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); +env.disableOperatorChaining(); Review Comment: It's better to also test the case where operator chaining is enabled. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java: ## @@ -38,9 +38,12 @@ */ @PublicEvolving public class GlobalWindows extends WindowAssigner { -private static final long serialVersionUID = 1L; +private static final long serialVersionUID = 2L; Review Comment: IIUC, we don't need to update the `serialVersionUID`, as adding a field is not incompatible, according to the [doc](https://docs.oracle.com/javase/7/docs/platform/serialization/spec/version.html#6678). We just need to return NeverTrigger in the `getDefaultTrigger` method if defaultTrigger is null so that the behavior is backward compatible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
mohitjain2504 commented on code in PR #24272: URL: https://github.com/apache/flink/pull/24272#discussion_r1479254174 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java: ## @@ -31,19 +35,31 @@ public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; +private final Map tasks; -public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { +public CheckpointCoordinatorDeActivator( +CheckpointCoordinator coordinator, Map tasks) { this.coordinator = checkNotNull(coordinator); +this.tasks = checkNotNull(tasks); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { -if (newJobStatus == JobStatus.RUNNING) { -// start the checkpoint scheduler +if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) { +// start the checkpoint scheduler if there is no blocking edge coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } + +private boolean allTasksOutputNonBlocking() { +for (ExecutionJobVertex vertex : tasks.values()) { Review Comment: can also write it in this manner ``` return tasks.values().stream() .noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
flinkbot commented on PR #24272: URL: https://github.com/apache/flink/pull/24272#issuecomment-1928706922 ## CI report: * 1f397403e3268900486f84b145452ef5a3c739f4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]
yunfengzhou-hub opened a new pull request, #24272: URL: https://github.com/apache/flink/pull/24272 ## What is the purpose of the change This pull request adds support for EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment. ## Brief change log - Introduces OperatorAttributes with isOutputOnlyAfterEndOfStream attribute - Schedules operators with isOutputOnlyAfterEndOfStream=true to run in blocking mode - Make WindowOperator and StreamSortOperator return isOutputOnlyAfterEndOfStream=true in certain cases ## Verifying this change - This change adds unit tests for OperatorAttributes and the corresponding blocking mode changes. - The change on WindowOperator and StreamSortOperator is covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org