[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318611759
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}
 
+   /**
+* Method for passing user defined operators along with the type 
information that will transform the DataStream.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo,
+   OneInputStreamOperatorFactory operatorFactory) {
 
 Review comment:
   Aljoscha is fine with the current state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-28 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318435616
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ##
 @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) {
streamTaskStateInitializer =
createStreamTaskStateManager(environment, 
stateBackend, processingTimeService);

mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
+
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) 
operator).setup(mockTask, config, new MockOutput(outputSerializer));
}
+   SimpleOperatorFactory factory = 
SimpleOperatorFactory.of(operator);
 
 Review comment:
   Was outdated but still wasn't using `StreamOperatorFactoryUtil` so I fixed 
that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318129783
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 ##
 @@ -58,9 +58,10 @@
 * Compare the two queues containing operator/task output by converting 
them to an array first.
 */
public static  void assertOutputEquals(String message, Queue 
expected, Queue actual) {
-   Assert.assertArrayEquals(message,
-   expected.toArray(),
-   actual.toArray());
+   Assert.assertEquals(
 
 Review comment:
   Removed them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r318129693
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 ##
 @@ -72,6 +74,10 @@ protected void init() throws Exception {
// re-initialize the operator with the correct collector.
StreamOperatorFactory operatorFactory = 
configuration.getStreamOperatorFactory(getUserCodeClassLoader());
headOperator = operatorFactory.createStreamOperator(this, 
configuration, new CollectorWrapper<>(collector));
+   if (operatorFactory instanceof YieldingOperatorFactory) {
 
 Review comment:
   Went with StreamOperatorFactoryUtil for now. Please check if you have better 
ideas.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317994445
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -27,19 +27,19 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * Implementation of {@link TaskMailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
 
 Review comment:
   I changed the comment to a more general reference to blocking queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317991596
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}
 
+   /**
+* Method for passing user defined operators along with the type 
information that will transform the DataStream.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo,
+   OneInputStreamOperatorFactory operatorFactory) {
 
 Review comment:
   I left it as PublicEvolving but added a comment that the other method is 
preferred.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317991063
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1172,8 +1173,10 @@ public ExecutionConfig getExecutionConfig() {
 * @param 
 *type of the return stream
 * @return the data stream constructed
+* @see #transform(String, TypeInformation, 
OneInputStreamOperatorFactory)
 */
@PublicEvolving
+   @Deprecated
public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator) {
 
 Review comment:
   Eliminated duplicate and removed @Deprecated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317971162
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   AsyncWaitOperator commit will be moved to another PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317956306
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 ##
 @@ -72,6 +74,10 @@ protected void init() throws Exception {
// re-initialize the operator with the correct collector.
StreamOperatorFactory operatorFactory = 
configuration.getStreamOperatorFactory(getUserCodeClassLoader());
headOperator = operatorFactory.createStreamOperator(this, 
configuration, new CollectorWrapper<>(collector));
+   if (operatorFactory instanceof YieldingOperatorFactory) {
 
 Review comment:
   Yes, we should find a way to avoid duplicates; I'll explore some 
alternatives to factory util.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317955279
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
+ * through a factory implementing this interface.
+ */
+public interface YieldingOperatorFactory {
 
 Review comment:
   Yes, it makes stuff easier to understand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-27 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317955614
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
+ * through a factory implementing this interface.
+ */
+public interface YieldingOperatorFactory {
 
 Review comment:
   Not needed code-wise because, we are currently resorting to instanceof 
checks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-20 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315558967
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
 
 Review comment:
   Okay, I will remove it then. But FYI, HTML5 is as strict as XML, so you need 
to have a closing tag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-20 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315556403
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
 ##
 @@ -97,22 +102,18 @@ public void run() {
}
}
 
-   private void output(AsyncResult asyncResult) throws 
InterruptedException {
+   /**
+* Executed as a mail in the mailbox thread. Output needs to be guarded 
with checkpoint lock (for the time being).
+*
+* @param asyncResult the result to output.
+*/
+   private void output(AsyncResult asyncResult) {
if (asyncResult.isWatermark()) {
-   synchronized (checkpointLock) {
-   AsyncWatermarkResult asyncWatermarkResult = 
asyncResult.asWatermark();
+   AsyncWatermarkResult asyncWatermarkResult = 
asyncResult.asWatermark();
 
 Review comment:
   Moved poll inside checkpoint lock


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-20 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315548601
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
 
 Review comment:
   Since we are slowly adopting newer Java versions, it's quite likely that we 
also switch to new javadoc with HTML5. I'd prefer to not revert back to HTML4 
unless you think it is confusing to devs and users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-20 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315546676
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   If we do yield now before migrating processing timers, we can run into 
deadlocks.
   
   Imagine the following: asyncIO is using an external system that is currently 
not available. The implementation relies on timeouts to occur. Thus, the queue 
of the AsyncWaitOperator is full with tasks that require the timeout to happen. 
At that point, we need to release the checkpoint lock instead of yielding 
(since processor timers are not migrated yet). Thus, we need to alternate 
between yieldToDownstream and releasing checkpoint lock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315172658
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -84,10 +89,14 @@ public boolean hasMail() {
 
@Override
public Optional tryTakeMail() throws MailboxStateException {
+   return downstreamMailbox.tryTakeMail();
+   }
+
+   private Optional tryTakeDownstreamMail(int operatorIndex) 
throws MailboxStateException {
 
 Review comment:
   Dropped mailbox interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315172523
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
+   // wait for the emitter thread to output the remaining 
elements
// we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   checkpointingLock.wait(1);
 
 Review comment:
   Also using the approach of Stefan to enqueue the result in a mailbox.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315172298
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws 
Exception {
MockEnvironment environment = createMockEnvironment();
 
StreamTask containingTask = mock(StreamTask.class);
+   TaskMailboxImpl mailbox = new TaskMailboxImpl();
when(containingTask.getEnvironment()).thenReturn(environment);
when(containingTask.getCheckpointLock()).thenReturn(lock);
when(containingTask.getProcessingTimeService()).thenReturn(new 
TestProcessingTimeService());
+   
when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new 
MailboxExecutorImpl(mailbox));
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315136528
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
+* buffers to flush.
+* If such an operator would indeed block the task thread, deadlocks 
can arise when two such operators are
+* chained.
+* It's up to the implementor of the operator to find a good 
trade-off, between yielding and own processing.
+* If the operator never blocks the task thread during input 
processing, this method should not be called at
+* all.
+* This method indicates whether downstream events actually have 
been processed, which can be used to issue
+* further invocations.
+*
+* @return true if any downstream event has been handled.
+*/
+   protected boolean yieldToDownstream() {
+   throw new NotImplementedException("API draft");
 
 Review comment:
   Exposing it now through #setup as discussed with Aljoscha.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315134705
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +280,79 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) 
{
+   return 
getDownstreamMailbox(operator.getOperatorConfig().getChainIndex());
 
 Review comment:
   Using int priority everywhere now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315107092
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   You are right, now that emitter thread always enqueues results that would 
work fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315070914
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
 * Add the given stream element queue entry to the operator's stream 
element queue. This
 * operation blocks until the element has been added.
 *
-* For that it tries to put the element into the queue and if not 
successful then it waits on
-* the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-* elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-* left again, by calling notifyAll on the checkpointing lock.
-*
 * @param streamElementQueueEntry to add to the operator's queue
 * @param  Type of the stream element queue entry's result
 * @throws InterruptedException if the current thread has been 
interrupted
 */
private  void addAsyncBufferEntry(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
-   pendingStreamElementQueueEntry = streamElementQueueEntry;
-
while (!queue.tryPut(streamElementQueueEntry)) {
-   // we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   yieldToDownstream();
}
-
-   pendingStreamElementQueueEntry = null;
}
 
-   private void waitInFlightInputsFinished() throws InterruptedException {
-   assert(Thread.holdsLock(checkpointingLock));
-
+   private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
-   // wait for the emitter thread to output the remaining 
elements
-   // for that he needs the checkpointing lock and thus we 
have to free it
-   checkpointingLock.wait();
+   yieldToDownstream();
 
 Review comment:
   Yes, and we have three options to solve that:
   * some sleep(1)
   * add some mutexes and conditions to be notified when new stuff arrives
   * find a cool completablefuture wrapper around the mutexes and conditions 
(yieldAsync and putAsync and then wait for the first thing to complete)
   
   Which one would you prefer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314638184
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
+   // wait for the emitter thread to output the remaining 
elements
// we wait for the emitter to notify us if the queue 
has space left again
-   checkpointingLock.wait();
+   checkpointingLock.wait(1);
 
 Review comment:
   wait(1) is needed for the emitter thread to do something (working on getting 
rid of emitter thread in a separate effort). A full wait would ofc not allow us 
to interleave mailbox messages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314637896
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
+* buffers to flush.
+* If such an operator would indeed block the task thread, deadlocks 
can arise when two such operators are
+* chained.
+* It's up to the implementor of the operator to find a good 
trade-off, between yielding and own processing.
+* If the operator never blocks the task thread during input 
processing, this method should not be called at
+* all.
+* This method indicates whether downstream events actually have 
been processed, which can be used to issue
+* further invocations.
+*
+* @return true if any downstream event has been handled.
+*/
+   protected boolean yieldToDownstream() {
+   throw new NotImplementedException("API draft");
 
 Review comment:
   I wanted to expose yieldToDownstream only to operator developers (thus 
protected). That also means, I cannot add it to the interface.
   I didn't get the last path? You want to inject it through the factory?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314637375
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws 
Exception {
MockEnvironment environment = createMockEnvironment();
 
StreamTask containingTask = mock(StreamTask.class);
+   TaskMailboxImpl mailbox = new TaskMailboxImpl();
when(containingTask.getEnvironment()).thenReturn(environment);
when(containingTask.getCheckpointLock()).thenReturn(lock);
when(containingTask.getProcessingTimeService()).thenReturn(new 
TestProcessingTimeService());
+   
when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new 
MailboxExecutorImpl(mailbox));
 
 Review comment:
   Yes, but afaik, it would require quite a bit of refactoring, which I'd do in 
another PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314300937
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +280,79 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) 
{
+   return 
getDownstreamMailbox(operator.getOperatorConfig().getChainIndex());
 
 Review comment:
   That's what I had in the first place.
   However, in respect to a possible branched operator chain (Piotr mentioned 
that this may come), I wanted to make the priority and its type an 
implementation detail.
   Nevertheless, I see your point about the cyclic coupling and would revert 
back to int priority. 
   Alternatively, we could also take the StreamConfig to avoid the cyclic 
dependency graph.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314299848
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
 
 Review comment:
   Yes, I can do that. But would first like to have a general feedback on the 
design.
   Btw the commit order has been shuffled by github for some reasons. This is 
actually the last commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314298805
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 ##
 @@ -1,73 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 
 Review comment:
   Oh shoot. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services