[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-04 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353930966
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -84,16 +83,30 @@
private final StreamTaskActionExecutor actionExecutor;
 
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
-   this(mailboxDefaultAction, new 
TaskMailboxImpl(Thread.currentThread()), StreamTaskActionExecutor.IMMEDIATE);
+   this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
}
 
-   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, 
TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor) {
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction,
 
 Review comment:
   fixed, thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-04 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353776003
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1433,35 +1423,16 @@ protected void init() {}
 
@Override
protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-   holder = new LockHolder(getCheckpointLock(), latch);
-   holder.start();
-   latch.await();
-
-   // we are at the point where cancelling can happen
-   syncLatch.trigger();
-
-   // just put this to sleep until it is interrupted
-   try {
-   Thread.sleep(1);
-   } catch (InterruptedException ignored) {
-   // restore interruption state
-   Thread.currentThread().interrupt();
+   syncLatch.trigger(); // signal that the task can be 
cancelled now
+   while (task.getExecutionState() == 
ExecutionState.RUNNING) { // wait for the containing task to be terminated from 
the outside
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException e) {
+   LOG.debug("interrupted while waiting 
for state transition", e);
+   Thread.currentThread().interrupt();
+   }
 
 Review comment:
   To summarize offline discussion:
   
   > Does this solve all of the issues?
   
   No, it only solves this particular issue.
   There are scenarios where shutting down can hang out because of mis-behaving 
source threads. Before the mailbox model it wasn't an issue.
   To address it we need to change the shutdown behaviour (`Task` and 
`StreamTask`).
   
   > Maybe this test is just invalid and could be dropped?
   
   The test is valid but probably test for an esoteric situation.
   
   Still, we decided to keep it and the associated change, because:
   1) internal mailbox actions shouldn't acquire the lock
   2) it's not trivial to enqueue an email that touches task state and doesn't 
acquire the lock


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353589776
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1433,35 +1423,16 @@ protected void init() {}
 
@Override
protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-   holder = new LockHolder(getCheckpointLock(), latch);
-   holder.start();
-   latch.await();
-
-   // we are at the point where cancelling can happen
-   syncLatch.trigger();
-
-   // just put this to sleep until it is interrupted
-   try {
-   Thread.sleep(1);
-   } catch (InterruptedException ignored) {
-   // restore interruption state
-   Thread.currentThread().interrupt();
+   syncLatch.trigger(); // signal that the task can be 
cancelled now
+   while (task.getExecutionState() == 
ExecutionState.RUNNING) { // wait for the containing task to be terminated from 
the outside
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException e) {
+   LOG.debug("interrupted while waiting 
for state transition", e);
+   Thread.currentThread().interrupt();
+   }
 
 Review comment:
   `TaskCanceller` thread started from `Task` was canceling `StreamTask`, which 
in the end was sending a mail to stop `MailboxProcessor` loop. 
   But after synchronizing each mail execution, `MailboxProcessor` was not able 
to execute that mail - the lock was held by a thread simulating an 
"ill-behaving" operator from the test.
   That thread wasn't interrupted, because interrupt is sent after the loop 
breaks.
   
   So I removed lock acquisition for this and 3 other "internal" mails (as it 
reflects the original logic and doesn't have performance issues, like adding 
`volatile` to `mailboxLoopRunning`).
   To do this I sent those mails directly from `MailboxProcessor` instead of 
using MailboxExecutor#executeFirst (which I removed, as it was only used for 
this).
   
   Probably, we should review the order of tear-down actions in StreamTask to 
make it more resilient to such scenarios.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353447124
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1433,35 +1423,16 @@ protected void init() {}
 
@Override
protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-   holder = new LockHolder(getCheckpointLock(), latch);
-   holder.start();
-   latch.await();
-
-   // we are at the point where cancelling can happen
-   syncLatch.trigger();
-
-   // just put this to sleep until it is interrupted
-   try {
-   Thread.sleep(1);
-   } catch (InterruptedException ignored) {
-   // restore interruption state
-   Thread.currentThread().interrupt();
+   syncLatch.trigger(); // signal that the task can be 
cancelled now
+   while (task.getExecutionState() == 
ExecutionState.RUNNING) { // wait for the containing task to be terminated from 
the outside
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException e) {
+   LOG.debug("interrupted while waiting 
for state transition", e);
+   Thread.currentThread().interrupt();
+   }
 
 Review comment:
   Addressed by not acquiring the lock for *internal* `MailboxProcessor` mails.
   I.e., `allActionsCompleted()` and 3 other similar actions don't use 
`MailboxExecutor` and `StreamTaskActionExecutor` and go directly into `Mailbox`.
   
   Please see hotfix commit 63d104b52a7fbca8a4d2dd2ce0f4a8d4bdad8178.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353443775
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
+   @Override
+   protected void init() {
+   }
+
+   @Override
+   protected void 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353106996
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1433,35 +1423,16 @@ protected void init() {}
 
@Override
protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-   holder = new LockHolder(getCheckpointLock(), latch);
-   holder.start();
-   latch.await();
-
-   // we are at the point where cancelling can happen
-   syncLatch.trigger();
-
-   // just put this to sleep until it is interrupted
-   try {
-   Thread.sleep(1);
-   } catch (InterruptedException ignored) {
-   // restore interruption state
-   Thread.currentThread().interrupt();
+   syncLatch.trigger(); // signal that the task can be 
cancelled now
+   while (task.getExecutionState() == 
ExecutionState.RUNNING) { // wait for the containing task to be terminated from 
the outside
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException e) {
+   LOG.debug("interrupted while waiting 
for state transition", e);
+   Thread.currentThread().interrupt();
+   }
 
 Review comment:
   To my understanding, it **was** testing that task **can** be canceled even 
if some thread holds the `checkpointLock`.
   
   As now it's not possible (I think it shouldn't - please correct me if I'm 
wrong) the test now only checks that `task.cancelExecution` leads to shutdown 
when the lock is not being held.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353093779
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
+   @Override
+   protected void init() {
+   }
+
+   @Override
+   protected void 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353089282
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -318,17 +319,10 @@ public void testStateBackendClosingOnFailure() throws 
Exception {
@Test
public void testCancellationNotBlockedOnLock() throws Exception {
syncLatch = new OneShotLatch();
-
-   StreamConfig cfg = new StreamConfig(new Configuration());
-   Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
-
-   // start the task and wait until it runs
-   // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
-   // is entered
+   task = createTask(CancelLockingTask.class, new StreamConfig(new 
Configuration()), new Configuration());
task.startTaskThread();
-   syncLatch.await();
 
-   // cancel the execution - this should lead to smooth shutdown
+   syncLatch.await(); // wait for the main loop to start
task.cancelExecution();
task.getExecutingThread().join();
 
 
 Review comment:
   You're right, addressed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-03 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353081981
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
+   @Override
+   protected void init() {
+   }
+
+   @Override
+   protected void 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352605953
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1433,35 +1423,16 @@ protected void init() {}
 
@Override
protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
-   holder = new LockHolder(getCheckpointLock(), latch);
-   holder.start();
-   latch.await();
-
-   // we are at the point where cancelling can happen
-   syncLatch.trigger();
-
-   // just put this to sleep until it is interrupted
-   try {
-   Thread.sleep(1);
-   } catch (InterruptedException ignored) {
-   // restore interruption state
-   Thread.currentThread().interrupt();
+   syncLatch.trigger(); // signal that the task can be 
cancelled now
+   while (task.getExecutionState() == 
ExecutionState.RUNNING) { // wait for the containing task to be terminated from 
the outside
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException e) {
+   LOG.debug("interrupted while waiting 
for state transition", e);
+   Thread.currentThread().interrupt();
+   }
 
 Review comment:
   Yes, the previous version held the checkpoint lock in a separate thread 
while the main thread was closing the Task. But closing the task involves 
stopping mailbox loop, which now requires a lock (to process "poison pill").
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352589261
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 ##
 @@ -81,4 +86,13 @@ public boolean tryYield() {
return false;
}
}
+
+   private Mail compose(@Nonnull Runnable command, int priority, String 
descriptionFormat, Object[] descriptionArgs) {
+   return new Mail(command, priority, descriptionFormat, 
descriptionArgs) {
 
 Review comment:
   > Also, as it is now, Mail class is not used anywhere in the production 
code, which makes it a bit strange. If we want to always use actionExecutor, 
then it should be added to the Mail.
   
   Added it to `Mail`, thanks.
   
   
   > I would try to avoid wrapping Mail into a no-op actionExecutor if it's 
possible
   
   Why? 
   Until we move the `getCheckpointLock()` method to `SourceStreamTask` it's 
always **not** no-op. After that, we can consider passing null/none (though I 
doubt if it's needed; I did a quick benchmark and didn't see any significant 
overhead).
   
   > Maybe shortcutting this wrapping if actionExecutor == IMMEDIATE?
   
   This could easily break.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352579140
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -262,7 +262,7 @@ protected StreamTask(
this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
this.executionDecorator = 
Preconditions.checkNotNull(executionDecorator);
-   this.mailboxProcessor = new 
MailboxProcessor(this::processInput);
+   this.mailboxProcessor = new 
MailboxProcessor(this::processInput, this.executionDecorator);
 
 Review comment:
   > I think invokeProcessingTimeCallback was not addressed.
   
   Thanks, addressed earlier.
   
   
   Isn't extra complexity introduced by having two methods doing essentially 
the same thing is a smell?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352573382
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
 ##
 @@ -52,8 +54,10 @@ public int getPriority() {
return priority;
}
 
-   public Runnable getRunnable() {
-   return runnable;
+   public void tryCancel(boolean mayInterruptIfRunning) {
+   if (runnable instanceof Future) {
+   ((Future) runnable).cancel(mayInterruptIfRunning);
+   }
 
 Review comment:
   Agree, but since `runnable` is encapsulated in `Mail` then it's the 
responsibility of `Mail` to cancel it if possible.
   One approach that comes to mind is to construct `Mail` with `closeCallback` 
along with `Runnable`, but that seems overcomplicated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352548956
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
 ##
 @@ -73,31 +74,30 @@ public void tearDown() {
}
 
@Test
-   public void testOperations() throws Exception {
-   final TestRunnable testRunnable = new TestRunnable();
-   mailboxExecutor.execute(testRunnable, "testRunnable");
-   Assert.assertEquals(testRunnable, 
mailbox.take(DEFAULT_PRIORITY).getRunnable());
+   public void testPutGet() throws Exception {
 
 Review comment:
   returned previous name `testOperations()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352527013
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -201,6 +201,8 @@
/** Flag to mark this task as canceled. */
 
 Review comment:
   updated commit message


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352523208
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ##
 @@ -28,13 +28,10 @@
 
 
 Review comment:
   Updated commit messaged. 
   Not sure if it's relevant outside of this PR, because without wrapping 
runnable exposing it is probably fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-02 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352523111
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
 ##
 @@ -73,31 +74,30 @@ public void tearDown() {
}
 
@Test
-   public void testOperations() throws Exception {
-   final TestRunnable testRunnable = new TestRunnable();
-   mailboxExecutor.execute(testRunnable, "testRunnable");
-   Assert.assertEquals(testRunnable, 
mailbox.take(DEFAULT_PRIORITY).getRunnable());
+   public void testPutGet() throws Exception {
 
 Review comment:
   Because what actually being tested here is `put` and `get` operations on the 
`mailbox`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352373397
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
 
 Review comment:
   The test checks that `actionExecutor` is used in `triggerCheckpointAsync` 
also outside of `triggerCheckpoint`.
   
   And I'd rather keep private methods private and test only public methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352371118
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -318,17 +319,10 @@ public void testStateBackendClosingOnFailure() throws 
Exception {
@Test
public void testCancellationNotBlockedOnLock() throws Exception {
syncLatch = new OneShotLatch();
-
-   StreamConfig cfg = new StreamConfig(new Configuration());
-   Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
-
-   // start the task and wait until it runs
-   // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
-   // is entered
+   task = createTask(CancelLockingTask.class, new StreamConfig(new 
Configuration()), new Configuration());
task.startTaskThread();
-   syncLatch.await();
 
-   // cancel the execution - this should lead to smooth shutdown
+   syncLatch.await(); // wait for the main loop to start
task.cancelExecution();
task.getExecutingThread().join();
 
 
 Review comment:
   Improving existing tests is out of scope of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352372889
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
 
 Review comment:
   There is no `operatorChain` for the `MockStreamTask` and it shouldn't 
`processInput()`.


This is an automated message from the 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352371372
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
+   @Override
+   protected void init() {
+   }
+
+   @Override
+   protected void 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352371330
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Verifies that {@link StreamTask} {@link StreamTaskActionExecutor decorates 
execution} of actions that potentially needs to be synchronized.
+ */
+public class StreamTaskExecutionDecorationTest {
+   private CountingStreamTaskActionExecutor decorator;
+   private StreamTask> task;
+
+   @Test
+   public void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
+   task.abortCheckpointOnBarrier(1, null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointOnBarrierIsDecorated() throws 
Exception {
+   task.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 2), 
new CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), null);
+   verify();
+   }
+
+   @Test
+   public void testTriggerCheckpointAsyncIsDecorated() throws Exception {
+   task.triggerCheckpointAsync(new CheckpointMetaData(1, 2), new 
CheckpointOptions(CheckpointType.CHECKPOINT, new 
CheckpointStorageLocationReference(new byte[]{1})), false);
+   new Thread(() -> {
+   try {
+   Thread.sleep(50);
+   } catch (InterruptedException ex) {
+   ex.printStackTrace();
+   } finally {
+   task.mailboxProcessor.allActionsCompleted();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   verify();
+   }
+
+   @Test
+   public void testMailboxExecutorIsDecorated() throws Exception {
+   CountDownLatch latch = new CountDownLatch(1);
+   
task.mailboxProcessor.getMainMailboxExecutor().asExecutor("test").execute(() -> 
{
+   try {
+   verify();
+   } finally {
+   latch.countDown();
+   }
+   });
+   new Thread(() -> {
+   try {
+   latch.await();
+   task.mailboxProcessor.allActionsCompleted();
+   } catch (Exception ex) {
+   ex.printStackTrace();
+   }
+   }).start();
+   task.mailboxProcessor.runMailboxLoop();
+   }
+
+   @Before
+   public void before() {
+   decorator = new CountingStreamTaskActionExecutor();
+   task = new StreamTask>(new 
StreamTaskTest.DeclineDummyEnvironment(), null, 
FatalExitExceptionHandler.INSTANCE, decorator) {
+   @Override
+   protected void init() {
+   }
+
+   @Override
+   protected void 

[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352371118
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -318,17 +319,10 @@ public void testStateBackendClosingOnFailure() throws 
Exception {
@Test
public void testCancellationNotBlockedOnLock() throws Exception {
syncLatch = new OneShotLatch();
-
-   StreamConfig cfg = new StreamConfig(new Configuration());
-   Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
-
-   // start the task and wait until it runs
-   // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
-   // is entered
+   task = createTask(CancelLockingTask.class, new StreamConfig(new 
Configuration()), new Configuration());
task.startTaskThread();
-   syncLatch.await();
 
-   // cancel the execution - this should lead to smooth shutdown
+   syncLatch.await(); // wait for the main loop to start
task.cancelExecution();
task.getExecutingThread().join();
 
 
 Review comment:
   Improving existing tests is out of scope of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352371032
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -152,7 +160,7 @@ public void runMailboxLoop() throws Exception {
final MailboxController defaultActionContext = new 
MailboxController(this);
 
while (processMail(localMailbox)) {
-   
mailboxDefaultAction.runDefaultAction(defaultActionContext);
+   
mailboxDefaultAction.runDefaultAction(defaultActionContext); // currently, lock 
is acquired inside each action
 
 Review comment:
   Probably, I misworded the comment (updated).
   The lock is acquired inside `default action` as needed, not in a uniform 
fashion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352369019
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -1486,22 +1486,17 @@ private void handleTimerException(Exception ex) {
}
 
private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor 
mailboxExecutor, ProcessingTimeCallback callback) {
-   return timestamp -> {
-   mailboxExecutor.execute(
-   () -> invokeProcessingTimeCallback(callback, 
timestamp),
-   "Timer callback for %s @ %d",
-   callback,
-   timestamp);
-   };
+   return timestamp ->
+   mailboxExecutor.execute(() -> {
+   try {
 
 Review comment:
   Not relevant anymore, removed, thanks.
   
   Not sure though, what's more complicated :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352370977
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 ##
 @@ -37,9 +39,12 @@
 
private final int priority;
 
-   public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority) {
+   public final StreamTaskActionExecutor actionExecutor;
 
 Review comment:
   > Why is this public?
   
   Fixed, thanks.
   
   
   The intent of this PR is to:
   1. make use of mailbox executor safe without the `checkpoint lock`
   2. prepare for push `checkpoint lock` down to `SourceStreamTask`
   
   Having a custom mailbox processor in `SourceStreamTask` would still require 
to synchronize some actions in `StreamTask`.
   As for performance, `actionExecutor` is not used for the hot path, i.e. 
`default action`.
   (and I'd love to see any data on how such a design (method call stack of 
depth 2 instead of 1) affects Flink performance)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352369808
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -152,7 +160,7 @@ public void runMailboxLoop() throws Exception {
final MailboxController defaultActionContext = new 
MailboxController(this);
 
while (processMail(localMailbox)) {
-   
mailboxDefaultAction.runDefaultAction(defaultActionContext);
+   
mailboxDefaultAction.runDefaultAction(defaultActionContext); // currently, lock 
is acquired inside each action
 
 Review comment:
   Probably, I misworded the comment (updated).
   The lock is acquired inside `default action` as needed, not in a uniform 
fashion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352370761
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -318,17 +319,10 @@ public void testStateBackendClosingOnFailure() throws 
Exception {
@Test
public void testCancellationNotBlockedOnLock() throws Exception {
syncLatch = new OneShotLatch();
-
-   StreamConfig cfg = new StreamConfig(new Configuration());
-   Task task = createTask(CancelLockingTask.class, cfg, new 
Configuration());
-
-   // start the task and wait until it runs
-   // execution state RUNNING is not enough, we need to wait until 
the stream task's run() method
-   // is entered
+   task = createTask(CancelLockingTask.class, new StreamConfig(new 
Configuration()), new Configuration());
task.startTaskThread();
-   syncLatch.await();
 
-   // cancel the execution - this should lead to smooth shutdown
+   syncLatch.await(); // wait for the main loop to start
task.cancelExecution();
task.getExecutingThread().join();
 
 
 Review comment:
   Improving the tests is not the scope of this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352368931
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -1486,22 +1486,17 @@ private void handleTimerException(Exception ex) {
}
 
private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor 
mailboxExecutor, ProcessingTimeCallback callback) {
-   return timestamp -> {
 
 Review comment:
   Not relevant anymore, removed, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352369808
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -152,7 +160,7 @@ public void runMailboxLoop() throws Exception {
final MailboxController defaultActionContext = new 
MailboxController(this);
 
while (processMail(localMailbox)) {
-   
mailboxDefaultAction.runDefaultAction(defaultActionContext);
+   
mailboxDefaultAction.runDefaultAction(defaultActionContext); // currently, lock 
is acquired inside each action
 
 Review comment:
   Probably, I misworded the comment (updated).
   The lock is acquired inside `default action` as needed, not in a uniform 
fashion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352367128
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+@Internal
+public interface ExecutionDecorator {
+   void run(Runnable runnable);
+
+void runThrowing(ThrowingRunnable runnable) 
throws E;
+
+R call(Callable callable) throws Exception;
+
+   void dispatch(Mail mail);
+
+   ExecutionDecorator NO_OP = new ExecutionDecorator() {
+   @Override
+   public void run(Runnable runnable) {
+   runnable.run();
+   }
+
+   @Override
+   public  void 
runThrowing(ThrowingRunnable runnable) throws E {
+   runnable.run();
+   }
+
+   @Override
+   public  R call(Callable callable) throws Exception {
+   return callable.call();
+   }
+
+   @Override
+   public void dispatch(Mail mail) {
+   mail.run();
+   }
+   };
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator() {
+   return syncExecutionDecorator(new Object());
+   }
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation on a 
given object.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator(Object 
mutex) {
+   return new SynchronizedExecutionDecorator(mutex);
+   }
+
+   /**
+* A {@link ExecutionDecorator} that synchronizes every operation on 
the provided mutex.
+*/
+   class SynchronizedExecutionDecorator implements ExecutionDecorator {
 
 Review comment:
   We still have `getCheckpointLock()` method in `StreamTask` and `private 
Object checkpointLock` or  `private Object lock` in lots of places. 
   The only change this PR introduces here is that `getCheckpointLock()` now 
delegates to the other class.
   
   What do you mean by techincally term?
   `mutex` field provides **mut**ual **ex**clusion for some sections of code. 
`lock` can be for example shared. A little bit more context.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-12-01 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352365599
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -148,11 +148,7 @@
 
// 

 
-   /**
-* All interaction with the {@code StreamOperator} must be synchronized 
on this lock object to
-* ensure that we don't have concurrent method calls that void 
consistent checkpoints.
-*/
-   private final Object lock = new Object();
+   private final ExecutionDecorator.SynchronizedExecutionDecorator 
executionDecorator;
 
 Review comment:
   It's going to be interface after the `getCheckpointLock()` method will be 
removed - hopefully, next PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-29 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r352028511
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+public interface ExecutionDecorator {
 
 Review comment:
   To me, `Executor` is usually related to threads (e.g. in j.u.c.). While 
same-thread implementation might be available, it is usually not the intent of 
creating the interface. Here the intent is to decorate each execution. And both 
design pattern and interface name should reflect the purpose of the abstraction.
   
   But it looks like we won't reach consensus in a foreseeable future :) So I 
renamed it to `StreamTaskActionExecutor` and its ex-`NO_OP` implementation to 
`ImmediateExecStreamTaskActionExecutor`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351909177
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 ##
 @@ -37,9 +39,12 @@
 
private final int priority;
 
-   public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority) {
+   public final ExecutionDecorator executionDecorator;
+
+   public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
ExecutionDecorator executionDecorator) {
 
 Review comment:
   Not all wrapped actions are `Mails`, so we will end up either wrapping them 
too into `Mail` or having different approaches to execute `Mails` and regular 
actions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351914910
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -262,7 +262,7 @@ protected StreamTask(
this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriter = createRecordWriterDelegate(configuration, 
environment);
this.executionDecorator = 
Preconditions.checkNotNull(executionDecorator);
-   this.mailboxProcessor = new 
MailboxProcessor(this::processInput);
+   this.mailboxProcessor = new 
MailboxProcessor(this::processInput, this.executionDecorator);
 
 Review comment:
   Inlining `invokeProcessingTimeCallback` and removing sync inside.
   As of `performCheckpoint()`, I don't think it's safe, because it's 
accessible via public methods. I think we should check the difference in 
benchmarks first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351909719
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+@Internal
+public interface ExecutionDecorator {
+   void run(Runnable runnable);
+
+void runThrowing(ThrowingRunnable runnable) 
throws E;
+
+R call(Callable callable) throws Exception;
+
+   void dispatch(Mail mail);
+
+   ExecutionDecorator NO_OP = new ExecutionDecorator() {
+   @Override
+   public void run(Runnable runnable) {
+   runnable.run();
+   }
+
+   @Override
+   public  void 
runThrowing(ThrowingRunnable runnable) throws E {
+   runnable.run();
+   }
+
+   @Override
+   public  R call(Callable callable) throws Exception {
+   return callable.call();
+   }
+
+   @Override
+   public void dispatch(Mail mail) {
+   mail.run();
+   }
+   };
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator() {
+   return syncExecutionDecorator(new Object());
+   }
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation on a 
given object.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator(Object 
mutex) {
+   return new SynchronizedExecutionDecorator(mutex);
+   }
+
+   /**
+* A {@link ExecutionDecorator} that synchronizes every operation on 
the provided mutex.
+*/
+   class SynchronizedExecutionDecorator implements ExecutionDecorator {
 
 Review comment:
   On the other hand, `checkpointLock` is confusing for newcomers:) As it is 
used not only for checkpoints.
   And the method `getCheckpointLock()` is still present. So I think we can 
just add Javadoc for now, and mention this in a later commit.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351909719
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+@Internal
+public interface ExecutionDecorator {
+   void run(Runnable runnable);
+
+void runThrowing(ThrowingRunnable runnable) 
throws E;
+
+R call(Callable callable) throws Exception;
+
+   void dispatch(Mail mail);
+
+   ExecutionDecorator NO_OP = new ExecutionDecorator() {
+   @Override
+   public void run(Runnable runnable) {
+   runnable.run();
+   }
+
+   @Override
+   public  void 
runThrowing(ThrowingRunnable runnable) throws E {
+   runnable.run();
+   }
+
+   @Override
+   public  R call(Callable callable) throws Exception {
+   return callable.call();
+   }
+
+   @Override
+   public void dispatch(Mail mail) {
+   mail.run();
+   }
+   };
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator() {
+   return syncExecutionDecorator(new Object());
+   }
+
+   /**
+* Returns an ExecutionDecorator that synchronizes each invocation on a 
given object.
+*/
+   static SynchronizedExecutionDecorator syncExecutionDecorator(Object 
mutex) {
+   return new SynchronizedExecutionDecorator(mutex);
+   }
+
+   /**
+* A {@link ExecutionDecorator} that synchronizes every operation on 
the provided mutex.
+*/
+   class SynchronizedExecutionDecorator implements ExecutionDecorator {
 
 Review comment:
   On the other hand, `checkpointLock` is confusing for newcomers:) As it is 
used not only for checkpoints.
   I think we can just add Javadoc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351909177
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 ##
 @@ -37,9 +39,12 @@
 
private final int priority;
 
-   public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority) {
+   public final ExecutionDecorator executionDecorator;
+
+   public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, 
ExecutionDecorator executionDecorator) {
 
 Review comment:
   Not all wrapped actions aren't `Mails`, so we will end up either wrapping 
them too into `Mail` or having different approaches to execute `Mails` and 
regular actions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351811427
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -148,11 +148,7 @@
 
// 

 
-   /**
-* All interaction with the {@code StreamOperator} must be synchronized 
on this lock object to
-* ensure that we don't have concurrent method calls that void 
consistent checkpoints.
-*/
-   private final Object lock = new Object();
+   private final ExecutionDecorator.SynchronizedExecutionDecorator 
executionDecorator;
 
 Review comment:
   Why? It spreads the responsibility of managing the lock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351804282
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -384,57 +386,59 @@ protected Counter 
setupNumRecordsInCounter(StreamOperator streamOperator) {
}
}
 
-   @Override
-   public final void invoke() throws Exception {
+   private void beforeInvoke() throws Exception {
 
 Review comment:
   This would require changes to most of the StreamTask descendants, which are 
completely unrelated to the current task. I'd rather keep the scope of changes 
limited.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351738365
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+public interface ExecutionDecorator {
+   void run(Runnable runnable);
+
+void runThrowing(ThrowingRunnable runnable) 
throws E;
 
 Review comment:
   I'd prefer to see such changes explicitly and method name to reflect the 
difference.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351737446
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -148,11 +148,7 @@
 
// 

 
-   /**
-* All interaction with the {@code StreamOperator} must be synchronized 
on this lock object to
-* ensure that we don't have concurrent method calls that void 
consistent checkpoints.
-*/
-   private final Object lock = new Object();
+   private final ExecutionDecorator.SynchronizedExecutionDecorator 
executionDecorator;
 
 Review comment:
   SynchronizedExecutionDecorator gives access to its lock object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351714937
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+public interface ExecutionDecorator {
 
 Review comment:
   > doesn't include the name of the design pattern
   
   Why not? Having it in the name makes it's purpose clear straight away.
   Executor and Synchronizer are incorrect, as it neither executes nor 
necessarily synchronizes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351736589
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+public interface ExecutionDecorator {
+   void run(Runnable runnable);
+
+void runThrowing(ThrowingRunnable runnable) 
throws E;
+
+R call(Callable callable) throws Exception;
+
+   void dispatch(Mail mail);
+
+   ExecutionDecorator NOP = new ExecutionDecorator() {
 
 Review comment:
   Yes, NOP is no operation. Unsynchronized sounds too specific to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351714937
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExecutionDecorator.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Wraps execution of a {@link Runnable}, {@link ThrowingRunnable}, {@link 
Callable}, or {@link Mail}.
+ * Intended to customize execution in sub-types fo {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask StreamTask},
+ * e.g. synchronization in {@link 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask SourceStreamTask}.
+ */
+public interface ExecutionDecorator {
 
 Review comment:
   > doesn't include the name of the design pattern
   
   Why not? Having it in the name makes it's purpose clear straight away.
   Executor and Synchronizer are incorrect, as it nor executes, nor necessarily 
synchronizes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #10345: [FLINK-12484][runtime] synchronize all mailbox actions

2019-11-28 Thread GitBox
rkhachatryan commented on a change in pull request #10345: 
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351712514
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -384,57 +386,59 @@ protected Counter 
setupNumRecordsInCounter(StreamOperator streamOperator) {
}
}
 
-   @Override
-   public final void invoke() throws Exception {
+   private void beforeInvoke() throws Exception {
 
 Review comment:
   Given that there is `init()` already adding something like `setup()` or 
`prepare()` would be confusing. Do you have something specific in mind?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services