[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-15 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r614264965



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##
@@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {}
  * target TaskManager. The future is completed exceptionally if the 
event cannot be sent.
  * That includes situations where the target task is not running.
  */
-CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask)
-throws TaskNotRunningException;
+CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask);

Review comment:
   I was thinking whether the signature is mixed with two different 
requirements:
   * `void transferState(OperatorStateEvent evt) throws 
TaskNotRunningException`(throwing is optional but might be useful) Sending 
failure will failover subtask. This actually means that coordinator want to 
transfer part of its state to subtask.
   * ` CompletableFuture sendEvent(OperatorEvent evt, Duration timeout)` 
Send an event with not exactly-once guarantee. It is author's responsibility to 
dealing with failure hence the timeout parameter. The event does not contribute 
or belong to state in either side.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613244353



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##
@@ -96,42 +94,44 @@ public boolean isShut() {
  * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
  * then the returned future till be completed exceptionally.
  */
-public CompletableFuture sendEvent(
-SerializedValue event, int subtask) {
-synchronized (lock) {
-if (!shut) {
-return eventSender.apply(event, subtask);
-}
-
-final List eventsForTask =
-blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-final CompletableFuture future = new 
CompletableFuture<>();
-eventsForTask.add(new BlockedEvent(event, subtask, future));
-return future;
+public void sendEvent(
+SerializedValue event,
+int subtask,
+CompletableFuture result) {
+checkRunsInMainThread();
+
+if (!shut) {
+final CompletableFuture ack = 
eventSender.apply(event, subtask);

Review comment:
   Skimming through `sendOperatorEvent`, I think it is thread safe for 
`SubtaskGateway`. The points are:
   * After `subtaskReady` and `RUNNING`(or `INITIALIZING`), `assignedResource` 
is visible anyway. Thus, not sending will lose.
   * Up failing, which kind of failure actually does not matter.
   
   Though, this "thread-safe" could be an accident.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613137597



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##
@@ -96,42 +94,44 @@ public boolean isShut() {
  * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
  * then the returned future till be completed exceptionally.
  */
-public CompletableFuture sendEvent(
-SerializedValue event, int subtask) {
-synchronized (lock) {
-if (!shut) {
-return eventSender.apply(event, subtask);
-}
-
-final List eventsForTask =
-blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-final CompletableFuture future = new 
CompletableFuture<>();
-eventsForTask.add(new BlockedEvent(event, subtask, future));
-return future;
+public void sendEvent(
+SerializedValue event,
+int subtask,
+CompletableFuture result) {
+checkRunsInMainThread();
+
+if (!shut) {
+final CompletableFuture ack = 
eventSender.apply(event, subtask);

Review comment:
   I am actually neutral to either thread modeling in 
`OperatorCoordinator`, it is pure implementation detail. No matter how we 
choose, rendezvous between checkpoint completion and event sending is 
unavoidable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613133900



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##
@@ -96,42 +94,44 @@ public boolean isShut() {
  * (because it gets dropped through a call to {@link #reset()} or {@link 
#resetForTask(int)},
  * then the returned future till be completed exceptionally.
  */
-public CompletableFuture sendEvent(
-SerializedValue event, int subtask) {
-synchronized (lock) {
-if (!shut) {
-return eventSender.apply(event, subtask);
-}
-
-final List eventsForTask =
-blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-final CompletableFuture future = new 
CompletableFuture<>();
-eventsForTask.add(new BlockedEvent(event, subtask, future));
-return future;
+public void sendEvent(
+SerializedValue event,
+int subtask,
+CompletableFuture result) {
+checkRunsInMainThread();
+
+if (!shut) {
+final CompletableFuture ack = 
eventSender.apply(event, subtask);

Review comment:
   I guess not. `Execution.sendOperatorEvent` is capable to run 
concurrently and the locking version(eg. revision before this change) should 
also work. It is just a matter of `OperatorCoordinator`'s choice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r613127303



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##
@@ -238,8 +241,11 @@ public void notifyCheckpointAborted(long checkpointId) {
 @Override
 public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
 throws Exception {
-// ideally we would like to check this here, however this method is 
called early during
-// execution graph construction, before the main thread executor is set
+// the first time this method is called is early during execution 
graph construction,
+// before the main thread executor is set. hence this conditional 
check.
+if (mainThreadExecutor != null) {
+mainThreadExecutor.assertRunningInMainThread();
+}

Review comment:
   I guess "known" does not means "run on". The construction of whole RPC 
endpoint is not run in the main thread, so the assertion will fail in my 
expectation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-13 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612937053



##
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {
+
+private static final int PARALLELISM = 1;
+private static MiniCluster flinkCluster;
+
+@BeforeClass
+public static void setupMiniClusterAndEnv() throws Exception {
+flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+flinkCluster.start();
+TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
+}
+
+@AfterClass
+public static void clearEnvAndStopMiniCluster() throws Exception {
+TestStreamEnvironment.unsetAsContext();
+if (flinkCluster != null) {
+flinkCluster.close();
+flinkCluster = null;
+}
+}
+
+// 
+//  tests
+// 
+
+/**
+ * Every second assign split event is lost. Eventually, the enumerator 
must recognize that an
+ * event was lost and trigger recovery to prevent data loss. Data loss 
would manifest in a
+ * stalled test, because we could wait 

[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-13 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612614499



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##
@@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {}
  * target TaskManager. The future is completed exceptionally if the 
event cannot be sent.
  * That includes situations where the target task is not running.
  */
-CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask)
-throws TaskNotRunningException;
+CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask);

Review comment:
   Hmmm, I see value of "received" future. To solely solve FLINK-21996(eg. 
source split assignment exactly-once), `SourceCoordinator` could take the 
received future into account for specific events during checkpointing. I am not 
sure how next pr will look like. But I think this(my assumed approach) actually 
means that `OperatorCoordinator` authors should also take exactly-once into 
account if they want this.
   
   Still not sure how this could be helpful for custom source events and no 
source task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-13 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612544962



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##
@@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {}
  * target TaskManager. The future is completed exceptionally if the 
event cannot be sent.
  * That includes situations where the target task is not running.
  */
-CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask)
-throws TaskNotRunningException;
+CompletableFuture sendEvent(OperatorEvent evt, int 
targetSubtask);

Review comment:
   > Callers can mostly decide to ignore the result of the future, if they 
handle the `notifyTaskFaile()` call.
   
   > a dated SubtaskGateway could throw TaskNotRunningException after subtask 
failed/reset for earlier failing possible active activities.
   
   With `subtaskReady` and `SubtaskGateway`, even silence should not hurt 
correctness. I think all should work. But given that future is most likely 
ignored, I am a bit prefer to call stack exception. It could be more helpful in 
diagnostics of buggy code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-13 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612363186



##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##
@@ -71,40 +70,67 @@
  */
 @Nullable private SplitT currentSplit;
 
-/** The remaining splits. Null means no splits have yet been assigned. */
-@Nullable private Queue remainingSplits;
+/** The remaining splits that were assigned but not yet processed. */
+private Queue remainingSplits;
+
+private boolean noMoreSplits;
 
 public IteratorSourceReader(SourceReaderContext context) {
 this.context = checkNotNull(context);
 this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
 }
 
 // 
 
 @Override
 public void start() {
-// request a split only if we did not get one during restore
-if (remainingSplits == null) {
+// request a split if we don't have one
+if (remainingSplits.isEmpty()) {
 context.sendSplitRequest();
 }
 }
 
 @Override
 public InputStatus pollNext(ReaderOutput output) {
-if (iterator != null && iterator.hasNext()) {
-output.collect(iterator.next());
+if (iterator != null) {
+if (iterator.hasNext()) {
+output.collect(iterator.next());
+return InputStatus.MORE_AVAILABLE;
+} else {
+finishSplit();
+}
+}
+
+return tryMoveToNextSplit();
+}
+
+private void finishSplit() {
+iterator = null;
+currentSplit = null;
+
+// request another split if no other is left
+// we do this only here in the finishSplit part to avoid requesting a 
split
+// whenever the reader is polled and doesn't currently have a split
+if (remainingSplits.isEmpty() && !noMoreSplits) {

Review comment:
   I overlooked the comment apparently  .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15557: [FLINK-21996][coordination] - Part one: Tests and adjusted threading model

2021-04-13 Thread GitBox


kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r611302632



##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##
@@ -71,40 +70,67 @@
  */
 @Nullable private SplitT currentSplit;
 
-/** The remaining splits. Null means no splits have yet been assigned. */
-@Nullable private Queue remainingSplits;
+/** The remaining splits that were assigned but not yet processed. */
+private Queue remainingSplits;

Review comment:
   `final` ?

##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##
@@ -71,40 +70,67 @@
  */
 @Nullable private SplitT currentSplit;
 
-/** The remaining splits. Null means no splits have yet been assigned. */
-@Nullable private Queue remainingSplits;
+/** The remaining splits that were assigned but not yet processed. */
+private Queue remainingSplits;
+
+private boolean noMoreSplits;
 
 public IteratorSourceReader(SourceReaderContext context) {
 this.context = checkNotNull(context);
 this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
 }
 
 // 
 
 @Override
 public void start() {
-// request a split only if we did not get one during restore
-if (remainingSplits == null) {
+// request a split if we don't have one
+if (remainingSplits.isEmpty()) {
 context.sendSplitRequest();
 }
 }
 
 @Override
 public InputStatus pollNext(ReaderOutput output) {
-if (iterator != null && iterator.hasNext()) {
-output.collect(iterator.next());
+if (iterator != null) {
+if (iterator.hasNext()) {
+output.collect(iterator.next());
+return InputStatus.MORE_AVAILABLE;
+} else {
+finishSplit();
+}
+}
+
+return tryMoveToNextSplit();
+}
+
+private void finishSplit() {
+iterator = null;
+currentSplit = null;
+
+// request another split if no other is left
+// we do this only here in the finishSplit part to avoid requesting a 
split
+// whenever the reader is polled and doesn't currently have a split
+if (remainingSplits.isEmpty() && !noMoreSplits) {

Review comment:
   How about move this to `tryMoveToNextSplit` ?

##
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##
@@ -71,40 +70,67 @@
  */
 @Nullable private SplitT currentSplit;
 
-/** The remaining splits. Null means no splits have yet been assigned. */
-@Nullable private Queue remainingSplits;
+/** The remaining splits that were assigned but not yet processed. */
+private Queue remainingSplits;
+
+private boolean noMoreSplits;
 
 public IteratorSourceReader(SourceReaderContext context) {
 this.context = checkNotNull(context);
 this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
 }
 
 // 
 
 @Override
 public void start() {
-// request a split only if we did not get one during restore
-if (remainingSplits == null) {
+// request a split if we don't have one
+if (remainingSplits.isEmpty()) {
 context.sendSplitRequest();
 }
 }
 
 @Override
 public InputStatus pollNext(ReaderOutput output) {
-if (iterator != null && iterator.hasNext()) {
-output.collect(iterator.next());
+if (iterator != null) {
+if (iterator.hasNext()) {
+output.collect(iterator.next());
+return InputStatus.MORE_AVAILABLE;
+} else {
+finishSplit();
+}
+}
+
+return tryMoveToNextSplit();
+}
+
+private void finishSplit() {
+iterator = null;
+currentSplit = null;
+
+// request another split if no other is left
+// we do this only here in the finishSplit part to avoid requesting a 
split
+// whenever the reader is polled and doesn't currently have a split
+if (remainingSplits.isEmpty() && !noMoreSplits) {

Review comment:
   It should be more easy to evaluate `InputStatus.NOTHING_AVAILABLE` 
clause part.

##
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for