[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318543002
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -286,13 +334,12 @@ public void setup(TypeSerializer outputSerializer) {
createStreamTaskStateManager(environment, 
stateBackend, processingTimeService);

mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
 
-   if (operator instanceof SetupableStreamOperator) {
+   if (operator == null) {
 
 Review comment:
   This changes are overwriting lines introduced in the previous commit, so 
something is not quite right here.
   
   Shouldn't some or all of the changes from this commit, that are touching 
TestHanresses, be squashed/moved to the previous commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318419679
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -148,12 +150,17 @@ public OperatorChain(
chainedConfigs,
userCodeClassloader,
streamOutputMap,
-   allOps);
+   allOps,
 
 Review comment:
   nit: formatting (and ditto the same issue in other places in this commit)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318420614
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) {
streamTaskStateInitializer =
createStreamTaskStateManager(environment, 
stateBackend, processingTimeService);

mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
+
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) 
operator).setup(mockTask, config, new MockOutput(outputSerializer));
}
+   SimpleOperatorFactory factory = 
SimpleOperatorFactory.of(operator);
 
 Review comment:
   @AHeise I'm re-opening this issue, as I do not see that it has changed and 
it also duplicates some code from `StreamOperatorFactoryUtil.createOperator`.
   
   Am I missing something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318419434
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 ##
 @@ -106,33 +109,37 @@ public void open() {
 * Lifecycle method to close the mailbox for action submission.
 */
public void prepareClose() {
-   mailboxExecutor.shutdown();
+   mailbox.quiesce();
}
 
/**
 * Lifecycle method to close the mailbox for action 
submission/retrieval. This will cancel all instances of
 * {@link java.util.concurrent.RunnableFuture} that are still contained 
in the mailbox.
 */
public void close() {
-   List droppedLetters = mailboxExecutor.shutdownNow();
+   List droppedLetters = mailbox.close();
if (!droppedLetters.isEmpty()) {
LOG.debug("Closing the mailbox dropped letters {}.", 
droppedLetters);
FutureUtils.cancelRunnableFutures(droppedLetters);
}
}
 
+   private boolean isMailboxThread() {
+   return Thread.currentThread() == mailboxThread;
+   }
+
/**
 * Runs the mailbox processing loop. This is where the main work is 
done.
 */
public void runMailboxLoop() throws Exception {
 
Preconditions.checkState(
-   mailboxExecutor.isMailboxThread(),
+   isMailboxThread(),
 
 Review comment:
   nit: formatting?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318428252
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -150,32 +153,76 @@ public AbstractStreamOperatorTestHarness(
int subtaskIndex,
OperatorID operatorID) throws Exception {
this(
-   operator,
-   new MockEnvironmentBuilder()
-   .setTaskName("MockTask")
-   .setMemorySize(3 * 1024 * 1024)
-   .setInputSplitProvider(new 
MockInputSplitProvider())
-   .setBufferSize(1024)
-   .setMaxParallelism(maxParallelism)
-   .setParallelism(parallelism)
-   .setSubtaskIndex(subtaskIndex)
-   .build(),
-   true,
-   operatorID);
+   operator,
 
 Review comment:
   nit: formatting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318419056
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}
 
+   /**
+* Method for passing user defined operators along with the type 
information that will transform the DataStream.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo,
+   OneInputStreamOperatorFactory operatorFactory) {
 
 Review comment:
   It makes it less convenient for power users, but it's necessary for cleaner 
API and runtime code. I would mark as deprecated, because I would like to have 
at least flink built-in operators to move away from this old API in favour of 
`StreamOperatorFactory` as soon as possible.
   
   But ultimately we can merge it with or without `@Deprecated` note and 
question would be more towards the @aljoscha what does he prefer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317577599
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 ##
 @@ -58,9 +58,10 @@
 * Compare the two queues containing operator/task output by converting 
them to an array first.
 */
public static  void assertOutputEquals(String message, Queue 
expected, Queue actual) {
-   Assert.assertArrayEquals(message,
-   expected.toArray(),
-   actual.toArray());
+   Assert.assertEquals(
 
 Review comment:
   Are those changes related to this commit? If not, could you pull them to a 
separate commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317581559
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -162,12 +166,20 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output(checkpointingLock, output, queue, 
this);
+   this.emitter = new Emitter<>(checkpointingLock,
 
 Review comment:
   nit: I think the recent consensus about wrapping the parameters was that the 
first parameter should be also in the new line:
   https://github.com/apache/flink-web/pull/254


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317625597
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}
 
+   /**
+* Method for passing user defined operators along with the type 
information that will transform the DataStream.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo,
 
 Review comment:
   nit: formatting (each parameter in a separate line)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317573452
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 ##
 @@ -72,6 +74,10 @@ protected void init() throws Exception {
// re-initialize the operator with the correct collector.
StreamOperatorFactory operatorFactory = 
configuration.getStreamOperatorFactory(getUserCodeClassLoader());
headOperator = operatorFactory.createStreamOperator(this, 
configuration, new CollectorWrapper<>(collector));
+   if (operatorFactory instanceof YieldingOperatorFactory) {
 
 Review comment:
   Should we deduplicate this if check? It's present in 4 different places. 
Maybe as a helper static method in `YieldingOperatorFactory`?
   
   edit:
   Also I would deduplicate this logic including `createStreamOperator(...)` 
call, as now in four different places (and in all of the future ones) we must 
make sure that `setMailboxExecutor(...)` has been correctly set before creating 
`StreamOperator`. This place here for example does this after the creation. 
Another place below looks like is not creating the operator at all.
   
   So I would extract this logic to some utility class, as a static method to 
`StreamOperatorFactoryUtil`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317626081
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -17,22 +17,68 @@
 
 package org.apache.flink.streaming.api.operators.async;
 
-import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
 
 /**
  * The factory of {@link AsyncWaitOperator}.
  *
  * @param  The output type of the operator
  */
-public class AsyncWaitOperatorFactory extends 
SimpleUdfStreamOperatorFactory implements YieldingOperatorFactory {
-   public AsyncWaitOperatorFactory(AsyncWaitOperator operator) {
-   super(operator);
+public class AsyncWaitOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private final AsyncFunction asyncFunction;
+   private final long timeout;
+   private final int capacity;
+   private final AsyncDataStream.OutputMode outputMode;
+   private MailboxExecutor mailboxExecutor;
+   private ChainingStrategy strategy = ChainingStrategy.HEAD;
+
+   public AsyncWaitOperatorFactory(AsyncFunction asyncFunction,
+   long timeout,
+   int capacity,
+   AsyncDataStream.OutputMode outputMode) {
+   this.asyncFunction = asyncFunction;
+   this.timeout = timeout;
+   this.capacity = capacity;
+   this.outputMode = outputMode;
}
 
@Override public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
-   AsyncWaitOperator operator = (AsyncWaitOperator) getOperator();
-   operator.setMailboxExecutor(mailboxExecutor);
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override public StreamOperator createStreamOperator(StreamTask 
containingTask, StreamConfig config,
+   Output output) {
+   AsyncWaitOperator asyncWaitOperator = new 
AsyncWaitOperator(asyncFunction,
+   timeout,
+   capacity,
+   outputMode,
+   mailboxExecutor);
 
 Review comment:
   `checkNotNull(mailboxExecutor)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317627543
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -170,13 +170,10 @@ public void run() {
}
 
/**
-* A special {@link AsyncFunction} without issuing
-* {@link ResultFuture#complete} until the latch counts to zero.
-* {@link ResultFuture#complete} until the latch counts to zero.
-* This function is used in the testStateSnapshotAndRestore, ensuring
-* that {@link StreamElementQueueEntry} can stay
-* in the {@link StreamElementQueue} to be
-* snapshotted while checkpointing.
+* A special {@link AsyncFunction} without issuing {@link 
ResultFuture#complete} until the latch counts to zero.
 
 Review comment:
   Are the changes in this file relevant to the commit? They look like some 
auto formatting issue?
   
   Also it looks like some of those "auto formatted" lines are not following 
our code style?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317574128
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
+ * through a factory implementing this interface.
+ */
+public interface YieldingOperatorFactory {
 
 Review comment:
   shouldn't this extend from `StreamOperatorFactory`? I know it doesn't matter 
from the code perspective, but it might be more self documenting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317578748
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -423,18 +434,25 @@ public boolean hasSelectiveReadingOperator() {
ClassLoader userCodeClassloader,
Map> streamOutputs,
List> allOperators,
-   OutputTag outputTag) {
+   OutputTag outputTag,
+   IntFunction mailboxExecutorFactory) {
 
 Review comment:
   Instead of `IntFunction` maybe define a simple 
`MailboxFactory` class that has a nicely named method with a named parameter? 
As it is now readability heavily depends on the usage, and in the signature of 
this constructor it's not obvious what this `IntFunction` should accept. 
   
   Actually even usage here is not obvious: 
`mailboxExecutorFactory.apply(operatorConfig.getChainIndex());`, since we think 
about this parameter as a "priority" and call it that way for the sake of 
yielding.
   
   Another issue, is that looking for usages with such interfaces is a bit 
problematic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317628154
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -165,18 +169,61 @@ public AbstractStreamOperatorTestHarness(
operatorID);
}
 
+   public AbstractStreamOperatorTestHarness(
+   StreamOperatorFactory factory,
+   MockEnvironment env) throws Exception {
+   this(null, factory, env, false, new OperatorID());
+   }
+
+   public AbstractStreamOperatorTestHarness(
+   StreamOperatorFactory factory,
+   int maxParallelism,
+   int parallelism,
+   int subtaskIndex) throws Exception {
+   this(
+   factory,
 
 Review comment:
   nit: something doesn't look right with this formatting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317575004
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) {
streamTaskStateInitializer =
createStreamTaskStateManager(environment, 
stateBackend, processingTimeService);

mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
+
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) 
operator).setup(mockTask, config, new MockOutput(outputSerializer));
}
+   SimpleOperatorFactory factory = 
SimpleOperatorFactory.of(operator);
 
 Review comment:
   I'm a bit confused here. How should it work? `factory. 
createStreamOperator(...)` is not being called here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315072874
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   Couldn't we blockingly wait while yielding? In other words, replacing 
`tryYield` with `yield`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315066822
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   Aren't we busy waiting here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314654753
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
+   // wait for the emitter thread to output the remaining 
elements
// we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   checkpointingLock.wait(1);
 
 Review comment:
   It the Stefan R's PR, there was no need for this `wait(1)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314654604
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws 
Exception {
MockEnvironment environment = createMockEnvironment();
 
StreamTask containingTask = mock(StreamTask.class);
+   TaskMailboxImpl mailbox = new TaskMailboxImpl();
when(containingTask.getEnvironment()).thenReturn(environment);
when(containingTask.getCheckpointLock()).thenReturn(lock);
when(containingTask.getProcessingTimeService()).thenReturn(new 
TestProcessingTimeService());
+   
when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new 
MailboxExecutorImpl(mailbox));
 
 Review comment:
   Dropping mockito here should be easy:
   ```
   +import org.apache.flink.streaming.util.MockStreamTask;
   +import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
   @@ -770,10 +772,10 @@ public class AsyncWaitOperatorTest extends TestLogger {

   MockEnvironment environment = createMockEnvironment();

   -   StreamTask containingTask = mock(StreamTask.class);
   -   
when(containingTask.getEnvironment()).thenReturn(environment);
   -   when(containingTask.getCheckpointLock()).thenReturn(lock);
   -   
when(containingTask.getProcessingTimeService()).thenReturn(new 
TestProcessingTimeService());
   +   MockStreamTask containingTask = new 
MockStreamTaskBuilder(environment)
   +   .setCheckpointLock(lock)
   +   .setProcessingTimeService(new 
TestProcessingTimeService())
   +   .build();

   StreamConfig streamConfig = new MockStreamConfig();
   streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
   ```
   and adding a new setter and accessor for `getMailboxExecutor(int ignored) -> 
mailbox`
   
   If it's indeed that simple I would prefer it as `hotfix` commit in this PR. 
Preferably as a base commit, before your changes, so that there is no 
intermittent code that will be removed in a commit after. But if you do not 
like the rebase hassle, I would be also ok in a `hotifx` commit on top of your 
change. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314654570
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
+* buffers to flush.
+* If such an operator would indeed block the task thread, deadlocks 
can arise when two such operators are
+* chained.
+* It's up to the implementor of the operator to find a good 
trade-off, between yielding and own processing.
+* If the operator never blocks the task thread during input 
processing, this method should not be called at
+* all.
+* This method indicates whether downstream events actually have 
been processed, which can be used to issue
+* further invocations.
+*
+* @return true if any downstream event has been handled.
+*/
+   protected boolean yieldToDownstream() {
+   throw new NotImplementedException("API draft");
 
 Review comment:
   I'm just not sure how to expose it to the users.
   
   Instead of `AbstractStreamOperator#yield()` we could have 
`StreamOperator#yield()`, or `StreamTask#yield()`. For the last one, 
`StreamTask` is accessible via `SetupableStreamOperator#setup()` and 
`StreamOperatorFactory#createStreamOperator()` methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314316038
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
+   // wait for the emitter thread to output the remaining 
elements
// we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   checkpointingLock.wait(1);
 
 Review comment:
   why `.wait(1)`?
   
   I would be expecting more or less the same code as Stefan R proposed in his 
version, but encapsulated/hidden from the user in `yieldToDownstream()`. Check 
my comment here: https://github.com/apache/flink/pull/9426#discussion_r314195257


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314312315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
+* buffers to flush.
+* If such an operator would indeed block the task thread, deadlocks 
can arise when two such operators are
+* chained.
+* It's up to the implementor of the operator to find a good 
trade-off, between yielding and own processing.
+* If the operator never blocks the task thread during input 
processing, this method should not be called at
+* all.
+* This method indicates whether downstream events actually have 
been processed, which can be used to issue
+* further invocations.
+*
+* @return true if any downstream event has been handled.
+*/
+   protected boolean yieldToDownstream() {
+   throw new NotImplementedException("API draft");
 
 Review comment:
   nit: `UnsupportedOperationException` instead of apache commons
   
   Also the question is, how to we want to expose `yieldToDownstream`? Via 
`AbstractStreamOperator`? Maybe `StreamOperator`?  Maybe we should allow access 
to it via a `StreamTask containingTask` passed in 
`StreamOperatorFactory#createStreamOperator` and 
`SetupableStreamOperator#setup`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314330128
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -84,10 +89,14 @@ public boolean hasMail() {
 
@Override
public Optional tryTakeMail() throws MailboxStateException {
+   return downstreamMailbox.tryTakeMail();
+   }
+
+   private Optional tryTakeDownstreamMail(int operatorIndex) 
throws MailboxStateException {
 
 Review comment:
   I was a bit confused about the class interactions here and the call chain is 
a bit complicated (`TaskMailboxImpl #tryTakeMail` -> goes to downstreamMailbox 
`downstreamMailbox.tryTakeMail()`  -> comes back here via 
`TaskMailboxImpl#tryTakeDownstreamMail`). Would it be simpler if dropped the 
requirement of `TaskMailbox` and `Mailbox` having the same interface? 
   
   On the most upper level I would actually prefer having to explicitly pass 
`operatorIndex` when calling `putMail`, to make this more explicit with less 
magic. It would be more readable for me to have this lower level api explicit, 
instead of figuring out what are the `putMail(mail)` or `tryTakeMail()` doing. 
`putMail(mail, priority)` or `tryTakeMail(priority)` is more self explanatory 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314331799
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java
 ##
 @@ -123,70 +123,76 @@ public void testConcurrentPutTakeNonBlockingAndWait() 
throws Exception {
 */
@Test
public void testCloseUnblocks() throws InterruptedException {
-   testAllPuttingUnblocksInternal(Mailbox::close);
+   testAllPuttingUnblocksInternal(TaskMailbox::close);
setUp();
-   testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close);
+   testUnblocksInternal(() -> taskMailbox.takeMail(), 
TaskMailbox::close);
}
 
/**
 * Test that silencing the mailbox unblocks pending accesses with 
correct exceptions.
 */
@Test
public void testQuiesceUnblocks() throws Exception {
-   testAllPuttingUnblocksInternal(Mailbox::quiesce);
+   testAllPuttingUnblocksInternal(TaskMailbox::quiesce);
}
 
@Test
public void testLifeCycleQuiesce() throws Exception {
-   mailbox.putMail(() -> {});
-   mailbox.putMail(() -> {});
-   mailbox.quiesce();
+   taskMailbox.putMail(() -> {
+   });
 
 Review comment:
   optional nit: remove new line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314330433
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 ##
 @@ -33,4 +33,13 @@
 * @throws MailboxStateException if the mailbox is quiesced or closed.
 */
void putMail(@Nonnull Runnable letter) throws  MailboxStateException;
+
+   /**
+* Adds the given action to the head of the mailbox.
+*
+* @param priorityLetter action to enqueue to the head of the mailbox.
+* @throws MailboxStateException if the mailbox is quiesced or closed.
+*/
+   void putFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException;
+
 
 Review comment:
   nit: unnecessary new line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314335073
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java
 ##
 @@ -256,11 +262,54 @@ private void testPutTake(
writerThread.join();
}
 
-   mailbox.putMail(POISON_LETTER);
+   taskMailbox.putMail(POISON_LETTER);
 
readerThread.join();
for (int perThreadResult : results) {
Assert.assertEquals(numLettersPerThread, 
perThreadResult);
}
}
+
+   /**
+* Tests the downstream view of {@link TaskMailbox}.
+*/
+   public static class DownstreamMailboxTest {
+   /**
+* Object under test.
+*/
+   private TaskMailboxImpl taskMailbox;
+
+   @Before
+   public void setUp() {
+   taskMailbox = new TaskMailboxImpl();
+   taskMailbox.open();
+   }
+
+   @After
+   public void tearDown() {
+   taskMailbox.close();
+   }
+
+   @Test
+   public void testPutAsHead() throws Exception {
+
+   Runnable instanceA = () -> {};
+   Runnable instanceB = () -> {};
+   Runnable instanceC = () -> {};
+   Runnable instanceD = () -> {};
+
+   taskMailbox.getDownstreamMailbox(1).putMail(instanceC);
+   taskMailbox.getDownstreamMailbox(2).putMail(instanceB);
+   taskMailbox.getDownstreamMailbox(1).putMail(instanceD);
+   taskMailbox.getDownstreamMailbox(2).putFirst(instanceA);
+
+   Assert.assertSame(instanceA, 
taskMailbox.getDownstreamMailbox(2).takeMail());
 
 Review comment:
   maybe we should add the same test (adding mail via downstream mailboxes), 
but with consuming from the `taskMailbox` directly (not via downstream 
mailboxes as this one).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314322494
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +280,79 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) 
{
+   return 
getDownstreamMailbox(operator.getOperatorConfig().getChainIndex());
 
 Review comment:
   +1 for the @1u0 comment.
   
   Dependency on `AbstractStreamOperator` is a very heavy one and will make 
testing/reusing difficult. I would like to avoid it even with potential future 
changes if `int priority` proves to be not enough. Keep in mind that we can 
always refactor this in the future and using now `int priority` will not cause 
us much (if any) overhead in the future if we need to change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314315851
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws 
Exception {
MockEnvironment environment = createMockEnvironment();
 
StreamTask containingTask = mock(StreamTask.class);
+   TaskMailboxImpl mailbox = new TaskMailboxImpl();
when(containingTask.getEnvironment()).thenReturn(environment);
when(containingTask.getCheckpointLock()).thenReturn(lock);
when(containingTask.getProcessingTimeService()).thenReturn(new 
TestProcessingTimeService());
+   
when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new 
MailboxExecutorImpl(mailbox));
 
 Review comment:
   arguably you are making this test worse by adding an extra mockito line here.
   
   You could try to replace it with `MockStreamTaskBuilder`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services