Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-28 Thread via GitHub


xintongsong commented on PR #24272:
URL: https://github.com/apache/flink/pull/24272#issuecomment-1968909988

   @mohitjain2504
   It's merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-28 Thread via GitHub


mohitjain2504 commented on PR #24272:
URL: https://github.com/apache/flink/pull/24272#issuecomment-1968895813

   What is the reason for closing? Did I miss something? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-28 Thread via GitHub


xintongsong closed pull request #24272: [FLINK-34371][runtime] Support 
EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to 
optimize task deployment
URL: https://github.com/apache/flink/pull/24272


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-27 Thread via GitHub


yunfengzhou-hub commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1502520887


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java:
##
@@ -61,6 +61,8 @@ public enum CheckpointFailureReason {
 
 CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."),
 
+CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."),

Review Comment:
   The message had been used to resolve the following race condition:
   - A test case attempts to trigger stop-with-savepoint on a Flink job, during 
which  CheckpointCoordinator#stopCheckpointScheduler() would be invoked before 
a savepoint is created.
   - The behavior introduced in this PR would check and invoke 
startCheckpointScheduler() if the running tasks contain no blocking edge and 
the scheduler is not started yet, and the status of stop-with-savepoint also 
meets this condition.
   - startCheckpointScheduler() would invoke stopCheckpointScheduler() first, 
which would cancel all ongoing and pending checkpoint/savepoints. The savepoint 
triggered by the test case is aborted because of this, and then the test case 
fails accordingly.
   
   As a temporary solution, I have required startCheckpointScheduler() not to 
invoke stopCheckpointScheduler() if the scheduler is not started yet, which 
helps to spare savepoints from being canceled in this process. Given that the 
race condition above would not occur once all tasks enter the running status, 
this solution should be acceptable for now.
   
   In the long run, I created FLINK-34519 to record the problem and basic ideas 
to facilitate the checkpointing mechanism for batch/streaming unification 
optimizations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-26 Thread via GitHub


yunfengzhou-hub commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1502520887


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java:
##
@@ -61,6 +61,8 @@ public enum CheckpointFailureReason {
 
 CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."),
 
+CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."),

Review Comment:
   I have added in this PR a detailed failure reason denoting cases when a 
savepoint fails due to the existence of blocking edge, which should be enough 
for now.
   
   In the long run, I created FLINK-34519 to record the problem and basic ideas 
to facilitate the checkpointing mechanism for batch/streaming unification 
optimizations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-26 Thread via GitHub


yunfengzhou-hub commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1502324525


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java:
##
@@ -76,4 +76,8 @@ public void setSupportsConcurrentExecutionAttempts(
 boolean supportsConcurrentExecutionAttempts) {
 this.supportsConcurrentExecutionAttempts = 
supportsConcurrentExecutionAttempts;
 }
+
+public boolean isOutputOnlyAfterEndOfStream() {
+return false;

Review Comment:
   I just realized that this method is only used in 
xxxInputTransformationTranslator, and in all these usages the invoker methods 
know the concrete subclass of the physical transformation. Thus I removed this 
method and only kept the implementations in subclasses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-25 Thread via GitHub


xintongsong commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1501806777


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** The builder class for {@link OperatorAttributes}. */
+@Experimental
+public class OperatorAttributesBuilder {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(OperatorAttributesBuilder.class);
+@Nullable private Boolean outputOnlyAfterEndOfStream = null;

Review Comment:
   Agree with @mohitjain2504 's opinion. In this particular case, I don't think 
it's necessary to distinguish the two cases. Might be nicer to keep it simple.



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java:
##
@@ -31,19 +35,27 @@
 public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 
 private final CheckpointCoordinator coordinator;
+private final Map tasks;
 
-public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) 
{
+public CheckpointCoordinatorDeActivator(
+CheckpointCoordinator coordinator, Map tasks) {
 this.coordinator = checkNotNull(coordinator);
+this.tasks = checkNotNull(tasks);
 }
 
 @Override
 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
-if (newJobStatus == JobStatus.RUNNING) {
-// start the checkpoint scheduler
+if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) {
+// start the checkpoint scheduler if there is no blocking edge
 coordinator.startCheckpointScheduler();
 } else {
 // anything else should stop the trigger for now
 coordinator.stopCheckpointScheduler();
 }
 }
+
+private boolean allTasksOutputNonBlocking() {
+return tasks.values().stream()
+.noneMatch(vertex -> 
vertex.getJobVertex().isAnyOutputBlocking());

Review Comment:
   The value of this never changes. It should be calculated at initialization, 
rather than every time `jobStatusChanges` is called.



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java:
##
@@ -61,6 +61,8 @@ public enum CheckpointFailureReason {
 
 CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."),
 
+CHECKPOINT_COORDINATOR_START(false, "Checkpoint Coordinator is starting."),

Review Comment:
   The message is confusing.
   1. According to how it is used, it's not about starting the checkpoint 
coordinator, but about canceling potential prior timers before schedule a new 
one.
   2. I don't really get how could this happen? Is this comes from first stop 
the checkpoint scheduler and then start it again? Then shouldn't the timers be 
already canceled at stopping the checkpoint scheduler?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java:
##
@@ -76,4 +76,8 @@ public void setSupportsConcurrentExecutionAttempts(
 boolean supportsConcurrentExecutionAttempts) {
 this.supportsConcurrentExecutionAttempts = 
supportsConcurrentExecutionAttempts;
 }
+
+public boolean isOutputOnlyAfterEndOfStream() {
+return false;

Review Comment:
   Let's add a comment indicating that this is just a default value, and is not 
by definition of PhysicalTransformation.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java:
##
@@ -75,7 +75,7 @@ public class StreamNode {
 private KeySelector[] statePartitioners = new KeySelector[0];
 private TypeSerializer stateKeySerializer;
 
-private StreamOperatorFactory operatorFactory;
+private @Nullable StreamOperatorFactory operatorFactory;

Review Comment:
   Why 

Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1497209330


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -2303,6 +2307,102 @@ void 
testOutputFormatSupportConcurrentExecutionAttempts() {
 new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), 
true);
 }
 
+@Test
+void testOutputOnlyAfterEndOfStream() {
+final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+
+final DataStream source = env.fromData(1, 2, 
3).name("source");
+source.keyBy(x -> x)
+.transform(
+"map",
+Types.INT,
+new StreamOperatorWithConfigurableOperatorAttributes<>(
+x -> x,
+new OperatorAttributesBuilder()
+.setOutputOnlyAfterEndOfStream(true)
+.build()))
+.sinkTo(new DiscardingSink<>())
+.name("sink");
+
+final StreamGraph streamGraph = env.getStreamGraph(false);
+Map nodeMap = new HashMap<>();
+for (StreamNode node : streamGraph.getStreamNodes()) {
+nodeMap.put(node.getOperatorName(), node);
+}
+assertThat(nodeMap).hasSize(3);
+assertThat(nodeMap.get("Source: 
source").isOutputOnlyAfterEndOfStream()).isFalse();
+assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
+assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+
+assertThat(nodeMap.get("Source: 
source").getManagedMemoryOperatorScopeUseCaseWeights())
+.isEmpty();
+
assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1);
+assertThat(nodeMap.get("sink: 
Writer").getManagedMemoryOperatorScopeUseCaseWeights())
+.isEmpty();
+
+JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+Map vertexMap = new HashMap<>();
+for (JobVertex vertex : jobGraph.getVertices()) {
+vertexMap.put(vertex.getName(), vertex);
+}
+assertThat(vertexMap).hasSize(2);
+assertHasOutputPartitionType(
+vertexMap.get("Source: source"), 
ResultPartitionType.PIPELINED_BOUNDED);

Review Comment:
   Tests have been added right after this line, disabling operator chaining and 
checking that the output edge of 
StreamOperatorWithConfigurableOperatorAttributes is blocking. Besides, there 
will be only one output edge(to be exact, IntermediateDataSet) no matter how 
many downstream operators there are, so this test case should be enough to 
verify that "all" downstream edges are blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-21 Thread via GitHub


yunfengzhou-hub commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1497166896


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** The builder class for {@link OperatorAttributes}. */
+@Experimental
+public class OperatorAttributesBuilder {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(OperatorAttributesBuilder.class);
+@Nullable private Boolean outputOnlyAfterEndOfStream = null;

Review Comment:
   The current implementation helps to distinguish cases when operator 
developers did not set an operator attribute and cases when they explicitly set 
the attribute value. This could be helpful for debugging purposes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-21 Thread via GitHub


WencongLiu commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1497054471


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java:
##
@@ -41,33 +41,45 @@ public final class OneInputTransformationTranslator
 @Override
 public Collection translateForBatchInternal(
 final OneInputTransformation transformation, final 
Context context) {
-KeySelector keySelector = transformation.getStateKeySelector();
 Collection ids =
 translateInternal(
 transformation,
 transformation.getOperatorFactory(),
 transformation.getInputType(),
-keySelector,
+transformation.getStateKeySelector(),
 transformation.getStateKeyType(),
 context);
-boolean isKeyed = keySelector != null;
-if (isKeyed) {
-BatchExecutionUtils.applyBatchExecutionSettings(
-transformation.getId(), context, 
StreamConfig.InputRequirement.SORTED);
-}
+
+maybeApplyBatchExecutionSettings(transformation, context);
 
 return ids;
 }
 
 @Override
 public Collection translateForStreamingInternal(
 final OneInputTransformation transformation, final 
Context context) {
-return translateInternal(
-transformation,
-transformation.getOperatorFactory(),
-transformation.getInputType(),
-transformation.getStateKeySelector(),
-transformation.getStateKeyType(),
-context);
+Collection ids =
+translateInternal(
+transformation,
+transformation.getOperatorFactory(),
+transformation.getInputType(),
+transformation.getStateKeySelector(),
+transformation.getStateKeyType(),
+context);
+
+if (transformation.isOutputOnlyAfterEndOfStream()) {

Review Comment:
   There may be some operators defined on keyed stream that sort records 
internally. Maybe we should add another attribute to identify that case.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -2303,6 +2307,102 @@ void 
testOutputFormatSupportConcurrentExecutionAttempts() {
 new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), 
true);
 }
 
+@Test
+void testOutputOnlyAfterEndOfStream() {
+final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+
+final DataStream source = env.fromData(1, 2, 
3).name("source");
+source.keyBy(x -> x)
+.transform(
+"map",
+Types.INT,
+new StreamOperatorWithConfigurableOperatorAttributes<>(
+x -> x,
+new OperatorAttributesBuilder()
+.setOutputOnlyAfterEndOfStream(true)
+.build()))
+.sinkTo(new DiscardingSink<>())
+.name("sink");
+
+final StreamGraph streamGraph = env.getStreamGraph(false);
+Map nodeMap = new HashMap<>();
+for (StreamNode node : streamGraph.getStreamNodes()) {
+nodeMap.put(node.getOperatorName(), node);
+}
+assertThat(nodeMap).hasSize(3);
+assertThat(nodeMap.get("Source: 
source").isOutputOnlyAfterEndOfStream()).isFalse();
+assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
+assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+
+assertThat(nodeMap.get("Source: 
source").getManagedMemoryOperatorScopeUseCaseWeights())
+.isEmpty();
+
assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1);
+assertThat(nodeMap.get("sink: 
Writer").getManagedMemoryOperatorScopeUseCaseWeights())
+.isEmpty();
+
+JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+Map vertexMap = new HashMap<>();
+for (JobVertex vertex : jobGraph.getVertices()) {
+vertexMap.put(vertex.getName(), vertex);
+}
+assertThat(vertexMap).hasSize(2);
+assertHasOutputPartitionType(
+vertexMap.get("Source: source"), 
ResultPartitionType.PIPELINED_BOUNDED);

Review Comment:
   I think we should confirm that all edges connected from downstream nodes of 
`StreamOperatorWithConfigurableOperatorAttributes` are `BLOCKING`. In this test 
case, 

Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-20 Thread via GitHub


mohitjain2504 commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1496989288


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorAttributesBuilder.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/** The builder class for {@link OperatorAttributes}. */
+@Experimental
+public class OperatorAttributesBuilder {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(OperatorAttributesBuilder.class);
+@Nullable private Boolean outputOnlyAfterEndOfStream = null;

Review Comment:
   Do we want to make this value false by default and remove the `@Nullable`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-20 Thread via GitHub


Sxnan commented on PR #24272:
URL: https://github.com/apache/flink/pull/24272#issuecomment-1955959322

   @yunfengzhou-hub Thanks for the update. LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-20 Thread via GitHub


yunfengzhou-hub commented on PR #24272:
URL: https://github.com/apache/flink/pull/24272#issuecomment-1955843349

   Thanks for the comments @Sxnan @mohitjain2504 . I have updated the PR 
according to the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-20 Thread via GitHub


Sxnan commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1495348233


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##
@@ -780,6 +780,9 @@ private void onTaskExecutionStateUpdate(
 // only notifies FINISHED and FAILED states which are needed at the 
moment.
 // can be refined in FLINK-14233 after the actions are factored out 
from ExecutionGraph.
 switch (taskExecutionState.getExecutionState()) {
+case RUNNING:

Review Comment:
   We should update the comment above.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -2333,6 +2334,12 @@ void testOutputOnlyAfterEndOfStream() {
 assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
 assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
 
+assertThat(nodeMap.get("Source: 
source").getManagedMemoryOperatorScopeUseCaseWeights())

Review Comment:
   We may want to test `TwoInputTransformation` and 
`AbstractInputTransformation` as well.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##
@@ -516,7 +516,7 @@ public void failJobDueToTaskFailure(
 if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
 // the periodic checkpoint scheduler is activated and deactivated 
as a result of
 // job status changes (running -> on, all other states -> off)
-
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
+
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator(tasks));

Review Comment:
   We should update the comment above so that it is accurate after the change.



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -2309,6 +2309,7 @@ void testOutputFormatSupportConcurrentExecutionAttempts() 
{
 void testOutputOnlyAfterEndOfStream() {
 final StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+env.disableOperatorChaining();

Review Comment:
   It's better to also test the case where operator chaining is enabled.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java:
##
@@ -38,9 +38,12 @@
  */
 @PublicEvolving
 public class GlobalWindows extends WindowAssigner {
-private static final long serialVersionUID = 1L;
+private static final long serialVersionUID = 2L;

Review Comment:
   IIUC, we don't need to update the `serialVersionUID`, as adding a field is 
not incompatible, according to the 
[doc](https://docs.oracle.com/javase/7/docs/platform/serialization/spec/version.html#6678).
 We just need to return NeverTrigger in the `getDefaultTrigger` method if 
defaultTrigger is null so that the behavior is backward compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-05 Thread via GitHub


mohitjain2504 commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1479254174


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java:
##
@@ -31,19 +35,31 @@
 public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 
 private final CheckpointCoordinator coordinator;
+private final Map tasks;
 
-public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) 
{
+public CheckpointCoordinatorDeActivator(
+CheckpointCoordinator coordinator, Map tasks) {
 this.coordinator = checkNotNull(coordinator);
+this.tasks = checkNotNull(tasks);
 }
 
 @Override
 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
-if (newJobStatus == JobStatus.RUNNING) {
-// start the checkpoint scheduler
+if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) {
+// start the checkpoint scheduler if there is no blocking edge
 coordinator.startCheckpointScheduler();
 } else {
 // anything else should stop the trigger for now
 coordinator.stopCheckpointScheduler();
 }
 }
+
+private boolean allTasksOutputNonBlocking() {
+for (ExecutionJobVertex vertex : tasks.values()) {

Review Comment:
   can also write it in this manner
   ```
   return tasks.values().stream()
.noneMatch(vertex -> 
vertex.getJobVertex().isAnyOutputBlocking());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-05 Thread via GitHub


flinkbot commented on PR #24272:
URL: https://github.com/apache/flink/pull/24272#issuecomment-1928706922

   
   ## CI report:
   
   * 1f397403e3268900486f84b145452ef5a3c739f4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34371][runtime] Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment [flink]

2024-02-05 Thread via GitHub


yunfengzhou-hub opened a new pull request, #24272:
URL: https://github.com/apache/flink/pull/24272

   ## What is the purpose of the change
   
   This pull request adds support for EndOfStreamTrigger and 
isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment.
   
   
   ## Brief change log
   
   - Introduces OperatorAttributes with isOutputOnlyAfterEndOfStream attribute
   - Schedules operators with isOutputOnlyAfterEndOfStream=true to run in 
blocking mode
   - Make WindowOperator and StreamSortOperator return 
isOutputOnlyAfterEndOfStream=true in certain cases
   
   ## Verifying this change
   
   - This change adds unit tests for OperatorAttributes and the corresponding 
blocking mode changes.
   - The change on WindowOperator and StreamSortOperator is covered by existing 
tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org