[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r614264965 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java ## @@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {} * target TaskManager. The future is completed exceptionally if the event cannot be sent. * That includes situations where the target task is not running. */ -CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask) -throws TaskNotRunningException; +CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask); Review comment: I was thinking whether the signature is mixed with two different requirements: * `void transferState(OperatorStateEvent evt) throws TaskNotRunningException`(throwing is optional but might be useful) Sending failure will failover subtask. This actually means that coordinator want to transfer part of its state to subtask. * ` CompletableFuture sendEvent(OperatorEvent evt, Duration timeout)` Send an event with not exactly-once guarantee. It is author's responsibility to dealing with failure hence the timeout parameter. The event does not contribute or belong to state in either side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r613244353 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java ## @@ -96,42 +94,44 @@ public boolean isShut() { * (because it gets dropped through a call to {@link #reset()} or {@link #resetForTask(int)}, * then the returned future till be completed exceptionally. */ -public CompletableFuture sendEvent( -SerializedValue event, int subtask) { -synchronized (lock) { -if (!shut) { -return eventSender.apply(event, subtask); -} - -final List eventsForTask = -blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>()); -final CompletableFuture future = new CompletableFuture<>(); -eventsForTask.add(new BlockedEvent(event, subtask, future)); -return future; +public void sendEvent( +SerializedValue event, +int subtask, +CompletableFuture result) { +checkRunsInMainThread(); + +if (!shut) { +final CompletableFuture ack = eventSender.apply(event, subtask); Review comment: Skimming through `sendOperatorEvent`, I think it is thread safe for `SubtaskGateway`. The points are: * After `subtaskReady` and `RUNNING`(or `INITIALIZING`), `assignedResource` is visible anyway. Thus, not sending will lose. * Up failing, which kind of failure actually does not matter. Though, this "thread-safe" could be an accident. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r613137597 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java ## @@ -96,42 +94,44 @@ public boolean isShut() { * (because it gets dropped through a call to {@link #reset()} or {@link #resetForTask(int)}, * then the returned future till be completed exceptionally. */ -public CompletableFuture sendEvent( -SerializedValue event, int subtask) { -synchronized (lock) { -if (!shut) { -return eventSender.apply(event, subtask); -} - -final List eventsForTask = -blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>()); -final CompletableFuture future = new CompletableFuture<>(); -eventsForTask.add(new BlockedEvent(event, subtask, future)); -return future; +public void sendEvent( +SerializedValue event, +int subtask, +CompletableFuture result) { +checkRunsInMainThread(); + +if (!shut) { +final CompletableFuture ack = eventSender.apply(event, subtask); Review comment: I am actually neutral to either thread modeling in `OperatorCoordinator`, it is pure implementation detail. No matter how we choose, rendezvous between checkpoint completion and event sending is unavoidable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r613133900 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java ## @@ -96,42 +94,44 @@ public boolean isShut() { * (because it gets dropped through a call to {@link #reset()} or {@link #resetForTask(int)}, * then the returned future till be completed exceptionally. */ -public CompletableFuture sendEvent( -SerializedValue event, int subtask) { -synchronized (lock) { -if (!shut) { -return eventSender.apply(event, subtask); -} - -final List eventsForTask = -blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>()); -final CompletableFuture future = new CompletableFuture<>(); -eventsForTask.add(new BlockedEvent(event, subtask, future)); -return future; +public void sendEvent( +SerializedValue event, +int subtask, +CompletableFuture result) { +checkRunsInMainThread(); + +if (!shut) { +final CompletableFuture ack = eventSender.apply(event, subtask); Review comment: I guess not. `Execution.sendOperatorEvent` is capable to run concurrently and the locking version(eg. revision before this change) should also work. It is just a matter of `OperatorCoordinator`'s choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r613127303 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java ## @@ -238,8 +241,11 @@ public void notifyCheckpointAborted(long checkpointId) { @Override public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception { -// ideally we would like to check this here, however this method is called early during -// execution graph construction, before the main thread executor is set +// the first time this method is called is early during execution graph construction, +// before the main thread executor is set. hence this conditional check. +if (mainThreadExecutor != null) { +mainThreadExecutor.assertRunningInMainThread(); +} Review comment: I guess "known" does not means "run on". The construction of whole RPC endpoint is not run in the main thread, so the assertion will fail in my expectation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r612937053 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java ## @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; +import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.TriFunction; + +import akka.actor.ActorSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.Assert.assertEquals; + +/** + * A test suite for source enumerator (operator coordinator) for situations where RPC calls for + * split assignments (operator events) fails from time to time. + */ +@SuppressWarnings("serial") +public class OperatorEventSendingCheckpointITCase { + +private static final int PARALLELISM = 1; +private static MiniCluster flinkCluster; + +@BeforeClass +public static void setupMiniClusterAndEnv() throws Exception { +flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM); +flinkCluster.start(); +TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM); +} + +@AfterClass +public static void clearEnvAndStopMiniCluster() throws Exception { +TestStreamEnvironment.unsetAsContext(); +if (flinkCluster != null) { +flinkCluster.close(); +flinkCluster = null; +} +} + +// +// tests +// + +/** + * Every second assign split event is lost. Eventually, the enumerator must recognize that an + * event was lost and trigger recovery to prevent data loss. Data loss would manifest in a + * stalled test, because we could wait
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r612614499 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java ## @@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {} * target TaskManager. The future is completed exceptionally if the event cannot be sent. * That includes situations where the target task is not running. */ -CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask) -throws TaskNotRunningException; +CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask); Review comment: Hmmm, I see value of "received" future. To solely solve FLINK-21996(eg. source split assignment exactly-once), `SourceCoordinator` could take the received future into account for specific events during checkpointing. I am not sure how next pr will look like. But I think this(my assumed approach) actually means that `OperatorCoordinator` authors should also take exactly-once into account if they want this. Still not sure how this could be helpful for custom source events and no source task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r612544962 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java ## @@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {} * target TaskManager. The future is completed exceptionally if the event cannot be sent. * That includes situations where the target task is not running. */ -CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask) -throws TaskNotRunningException; +CompletableFuture sendEvent(OperatorEvent evt, int targetSubtask); Review comment: > Callers can mostly decide to ignore the result of the future, if they handle the `notifyTaskFaile()` call. > a dated SubtaskGateway could throw TaskNotRunningException after subtask failed/reset for earlier failing possible active activities. With `subtaskReady` and `SubtaskGateway`, even silence should not hurt correctness. I think all should work. But given that future is most likely ignored, I am a bit prefer to call stack exception. It could be more helpful in diagnostics of buggy code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r612363186 ## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java ## @@ -71,40 +70,67 @@ */ @Nullable private SplitT currentSplit; -/** The remaining splits. Null means no splits have yet been assigned. */ -@Nullable private Queue remainingSplits; +/** The remaining splits that were assigned but not yet processed. */ +private Queue remainingSplits; + +private boolean noMoreSplits; public IteratorSourceReader(SourceReaderContext context) { this.context = checkNotNull(context); this.availability = new CompletableFuture<>(); +this.remainingSplits = new ArrayDeque<>(); } // @Override public void start() { -// request a split only if we did not get one during restore -if (remainingSplits == null) { +// request a split if we don't have one +if (remainingSplits.isEmpty()) { context.sendSplitRequest(); } } @Override public InputStatus pollNext(ReaderOutput output) { -if (iterator != null && iterator.hasNext()) { -output.collect(iterator.next()); +if (iterator != null) { +if (iterator.hasNext()) { +output.collect(iterator.next()); +return InputStatus.MORE_AVAILABLE; +} else { +finishSplit(); +} +} + +return tryMoveToNextSplit(); +} + +private void finishSplit() { +iterator = null; +currentSplit = null; + +// request another split if no other is left +// we do this only here in the finishSplit part to avoid requesting a split +// whenever the reader is polled and doesn't currently have a split +if (remainingSplits.isEmpty() && !noMoreSplits) { Review comment: I overlooked the comment apparently . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model
kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r611302632 ## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java ## @@ -71,40 +70,67 @@ */ @Nullable private SplitT currentSplit; -/** The remaining splits. Null means no splits have yet been assigned. */ -@Nullable private Queue remainingSplits; +/** The remaining splits that were assigned but not yet processed. */ +private Queue remainingSplits; Review comment: `final` ? ## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java ## @@ -71,40 +70,67 @@ */ @Nullable private SplitT currentSplit; -/** The remaining splits. Null means no splits have yet been assigned. */ -@Nullable private Queue remainingSplits; +/** The remaining splits that were assigned but not yet processed. */ +private Queue remainingSplits; + +private boolean noMoreSplits; public IteratorSourceReader(SourceReaderContext context) { this.context = checkNotNull(context); this.availability = new CompletableFuture<>(); +this.remainingSplits = new ArrayDeque<>(); } // @Override public void start() { -// request a split only if we did not get one during restore -if (remainingSplits == null) { +// request a split if we don't have one +if (remainingSplits.isEmpty()) { context.sendSplitRequest(); } } @Override public InputStatus pollNext(ReaderOutput output) { -if (iterator != null && iterator.hasNext()) { -output.collect(iterator.next()); +if (iterator != null) { +if (iterator.hasNext()) { +output.collect(iterator.next()); +return InputStatus.MORE_AVAILABLE; +} else { +finishSplit(); +} +} + +return tryMoveToNextSplit(); +} + +private void finishSplit() { +iterator = null; +currentSplit = null; + +// request another split if no other is left +// we do this only here in the finishSplit part to avoid requesting a split +// whenever the reader is polled and doesn't currently have a split +if (remainingSplits.isEmpty() && !noMoreSplits) { Review comment: How about move this to `tryMoveToNextSplit` ? ## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java ## @@ -71,40 +70,67 @@ */ @Nullable private SplitT currentSplit; -/** The remaining splits. Null means no splits have yet been assigned. */ -@Nullable private Queue remainingSplits; +/** The remaining splits that were assigned but not yet processed. */ +private Queue remainingSplits; + +private boolean noMoreSplits; public IteratorSourceReader(SourceReaderContext context) { this.context = checkNotNull(context); this.availability = new CompletableFuture<>(); +this.remainingSplits = new ArrayDeque<>(); } // @Override public void start() { -// request a split only if we did not get one during restore -if (remainingSplits == null) { +// request a split if we don't have one +if (remainingSplits.isEmpty()) { context.sendSplitRequest(); } } @Override public InputStatus pollNext(ReaderOutput output) { -if (iterator != null && iterator.hasNext()) { -output.collect(iterator.next()); +if (iterator != null) { +if (iterator.hasNext()) { +output.collect(iterator.next()); +return InputStatus.MORE_AVAILABLE; +} else { +finishSplit(); +} +} + +return tryMoveToNextSplit(); +} + +private void finishSplit() { +iterator = null; +currentSplit = null; + +// request another split if no other is left +// we do this only here in the finishSplit part to avoid requesting a split +// whenever the reader is polled and doesn't currently have a split +if (remainingSplits.isEmpty() && !noMoreSplits) { Review comment: It should be more easy to evaluate `InputStatus.NOTHING_AVAILABLE` clause part. ## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java ## @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for