[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-29 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r319042613
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutorFactory.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox.execution;
+
+import java.util.function.IntFunction;
+
+/**
+ * A factory for creating mailbox executors with a given priority. The factory 
is usually bound to a specific task.
+ */
+@FunctionalInterface
+public interface MailboxExecutorFactory extends IntFunction {
+   @Override
+   default MailboxExecutor apply(int priority) {
 
 Review comment:
   Imo, `extends IntFunction` and this method are redundant 
and can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-29 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r319038740
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1172,21 +1175,53 @@ public ExecutionConfig getExecutionConfig() {
 * @param 
 *type of the return stream
 * @return the data stream constructed
+* @see #transform(String, TypeInformation, 
OneInputStreamOperatorFactory)
 */
@PublicEvolving
-   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator) {
+   public  SingleOutputStreamOperator transform(
+   String operatorName,
+   TypeInformation outTypeInfo,
+   OneInputStreamOperator operator) {
+
+   return transformImpl(operatorName, outTypeInfo, 
SimpleOperatorFactory.of(operator));
+   }
+
+   /**
+* Method for passing user defined operators created by the given 
factory along with the type information that will
+* transform the DataStream.
+*
+* This method uses the rather new operator factories and should 
only be used when custom factories are needed.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(
+   String operatorName,
+   TypeInformation outTypeInfo,
+   OneInputStreamOperatorFactory operatorFactory) {
+   return transformImpl(operatorName, outTypeInfo, 
operatorFactory);
+   }
+
+   private  SingleOutputStreamOperator transformImpl(
 
 Review comment:
   Suggestion for as an alternative name: `doTransform`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317631717
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java
 ##
 @@ -238,7 +238,7 @@ public void testTemporalLeftAsyncJoinWithFilter() throws 
Exception {
ASYNC_BUFFER_CAPACITY);
}
 
-   AsyncWaitOperator operator = new 
AsyncWaitOperator<>(
+   AsyncWaitOperatorFactory operator = new 
AsyncWaitOperatorFactory<>(
 
 Review comment:
   Rename to `operatorFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317617074
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}
 
+   /**
+* Method for passing user defined operators along with the type 
information that will transform the DataStream.
+*
+* @param operatorName name of the operator, for logging purposes
+* @param outTypeInfo the output type of the operator
+* @param operatorFactory the factory for the operator.
+* @param  type of the return stream
+* @return the data stream constructed
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo,
+   OneInputStreamOperatorFactory operatorFactory) {
 
 Review comment:
   Continuing my comment about deprecating the previous method:
   imo, providing this method as the new alternative maybe not so user friendly 
as users would need create additional layer (factory) to provide their own 
operators.
   Also, it may be not so easy to provide api stability from runtime point of 
view, as the operator factories were mostly internal so far.
   
   Maybe mark this method as `@Internal` at least for now? **Note** that 
`StreamOperatorFactory` (parent of `OneInputStreamOperatorFactory`) is also 
marked as `@Internal`.
   
   **Nit:** the documentation string of this method is not exact, as the users 
don't pass an operator instance. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317620870
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
+
+/**
+ * The factory of {@link AsyncWaitOperator}.
+ *
+ * @param  The output type of the operator
+ */
+public class AsyncWaitOperatorFactory implements 
OneInputStreamOperatorFactory, YieldingOperatorFactory {
+   private final AsyncFunction asyncFunction;
+   private final long timeout;
+   private final int capacity;
+   private final AsyncDataStream.OutputMode outputMode;
+   private MailboxExecutor mailboxExecutor;
+   private ChainingStrategy strategy = ChainingStrategy.HEAD;
+
+   public AsyncWaitOperatorFactory(AsyncFunction asyncFunction,
+   long timeout,
+   int capacity,
+   AsyncDataStream.OutputMode outputMode) {
+   this.asyncFunction = asyncFunction;
+   this.timeout = timeout;
+   this.capacity = capacity;
+   this.outputMode = outputMode;
+   }
+
+   @Override public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
+   this.mailboxExecutor = mailboxExecutor;
+   }
+
+   @Override public StreamOperator createStreamOperator(StreamTask 
containingTask, StreamConfig config,
+   Output output) {
+   AsyncWaitOperator asyncWaitOperator = new 
AsyncWaitOperator(asyncFunction,
 
 Review comment:
   Suggestion: add a runtime check, like:
   ```
   Preconditions.checkState(mailboxExecutor != null, "...");
   ```
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317627442
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -27,19 +27,19 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Implementation of {@link Mailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
+ * Implementation of {@link TaskMailbox} inspired by {@link 
java.util.concurrent.ArrayBlockingQueue} and tailored towards
 
 Review comment:
   Not related to changes in this PR, but `ArrayBlockingQueue` notice is 
outdated. The implementation is not using ring buffer anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317601099
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ##
 @@ -78,7 +79,7 @@
true);
 
// create transform
-   AsyncWaitOperator operator = new AsyncWaitOperator<>(
+   AsyncWaitOperatorFactory operator = new 
AsyncWaitOperatorFactory<>(
 
 Review comment:
   Nit: rename `operator` to smt. like `operatorFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317603086
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ##
 @@ -1172,8 +1173,10 @@ public ExecutionConfig getExecutionConfig() {
 * @param 
 *type of the return stream
 * @return the data stream constructed
+* @see #transform(String, TypeInformation, 
OneInputStreamOperatorFactory)
 */
@PublicEvolving
+   @Deprecated
public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator) {
 
 Review comment:
   You can eliminate code duplication (this and the newly introduced method) by 
introducing a private method (with more broader `operatorFactory` argument) 
like:
   ```
private  SingleOutputStreamOperator transform(
   String operatorName, TypeInformation outTypeInfo, 
StreamOperatorFactory operatorFactory) {
   ...
   }
   ```
   
   The old methods would just delegate call to the common implementation, for 
example:
   ```
public  SingleOutputStreamOperator transform(String operatorName, 
TypeInformation outTypeInfo, OneInputStreamOperator operator) {
   return transform(operatorName, outTypeInfo, 
SimpleOperatorFactory.of(operator));
   }
   ```
   
   Regarding, the deprecation of this api: I think this api method is more user 
targeted (`OneInputStreamOperator` is `@PublicEvolving`) and it's more or less 
ok for users to write their own operators as Java classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317582514
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 ##
 @@ -72,6 +74,10 @@ protected void init() throws Exception {
// re-initialize the operator with the correct collector.
StreamOperatorFactory operatorFactory = 
configuration.getStreamOperatorFactory(getUserCodeClassLoader());
headOperator = operatorFactory.createStreamOperator(this, 
configuration, new CollectorWrapper<>(collector));
+   if (operatorFactory instanceof YieldingOperatorFactory) {
 
 Review comment:
   Should this block go before the previous line 
(`operatorFactory.createStreamOperator(this, configuration, new 
CollectorWrapper<>(collector));`)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317631276
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
 ##
 @@ -17,13 +17,27 @@
  */
 package org.apache.flink.table.planner.plan.nodes.common
 
+import java.util.Collections
+import java.util.concurrent.CompletableFuture
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.mapping.IntPair
 
 Review comment:
   Nit: afaik, the project imports convention is to list
1. Flink imports first;
2. other imports (non `java` and `javax`);
3. `javax`;
4. `java`.
   
   At least for `*.java` source files, there should be a code check failure if 
imports are in wrong order.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317628235
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +257,60 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(int priority) {
+   Preconditions.checkArgument(priority >= 0, "The priority of a 
downstream mailbox should be non-negative");
+   return new DownstreamMailbox(priority);
+   }
+
+   class DownstreamMailbox implements Mailbox {
+   private final int priority;
+
+   DownstreamMailbox(int priority) {
+   this.priority = priority;
+   }
+
+   @Override
+   public Optional tryTakeMail() throws 
MailboxStateException {
+   return TaskMailboxImpl.this.tryTakeMail(priority);
+   }
+
+   @Nonnull
+   @Override
+   public Runnable takeMail() throws InterruptedException, 
MailboxStateException {
+   return TaskMailboxImpl.this.takeMail(priority);
+   }
+
+   @Override
+   public void putMail(@Nonnull Runnable letter) throws 
MailboxStateException {
+   TaskMailboxImpl.this.putMail(letter, priority);
+   }
+
+   @Override
+   public void putFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException {
+   TaskMailboxImpl.this.putFirst(priorityLetter);
+   }
+   }
+
+   /**
+* An executable bound to a specific operator in the chain, such that 
it can be picked for downstream mailbox.
+*/
+   static class Mail {
+   private final Runnable runnable;
+   private final int priority;
+
+   public Mail(Runnable runnable, int priority) {
+   this.runnable = runnable;
+   this.priority = priority;
+   }
+
+   int getpriority() {
 
 Review comment:
   Typo: `getpriority` -> `getPriority`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317628687
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
 ##
 @@ -40,6 +43,21 @@
@Override
void execute(@Nonnull Runnable command) throws 
RejectedExecutionException;
 
+   /**
+* Submits the given command for execution ain the future in the 
mailbox thread. nd returns a Future representing
 
 Review comment:
   Typos.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-26 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317625473
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -687,6 +684,10 @@ public StreamStatusMaintainer getStreamStatusMaintainer() 
{
return operatorChain.getStreamOutputs();
}
 
+   public IntFunction getMailboxExecutorFactory() {
 
 Review comment:
   What do you think about just having an explicit factory interface instead of 
using too generic `IntFunction`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-20 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315556884
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
 
 Review comment:
   I can't say much about newer Java version transition and if it would involve 
reformatting of all Java docs.
   Afaik, HTML5 also allows non paired `` tags. So the old "convention" 
could be still valid.
   
   I'm not opinionated which style to choose, but I'm for consistence and more 
uniform style.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315101347
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
+* buffers to flush.
+* If such an operator would indeed block the task thread, deadlocks 
can arise when two such operators are
+* chained.
+* It's up to the implementor of the operator to find a good 
trade-off, between yielding and own processing.
+* If the operator never blocks the task thread during input 
processing, this method should not be called at
+* all.
+* This method indicates whether downstream events actually have 
been processed, which can be used to issue
+* further invocations.
+*
+* @return true if any downstream event has been handled.
+*/
+   protected boolean yieldToDownstream() {
+   if (mailboxExecutor.isMailboxThread()) {
+   boolean more = mailboxExecutor.tryYield();
+   while (more && mailboxExecutor.tryYield()) {
+   }
+   return more;
 
 Review comment:
   Alternative:
   ```java
   boolean processed = false;
   while (mailboxExecutor.tryYield()) {
   processed = true;
   }
   return processed;
   ```
   
   You can also consider to call `tryYield()` only once approach for now:
   ```java
   protected boolean yieldToDownstream() {
   return mailboxExecutor.isMailboxThread() && mailboxExecutor.tryYield();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315124537
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
 ##
 @@ -97,22 +102,18 @@ public void run() {
}
}
 
-   private void output(AsyncResult asyncResult) throws 
InterruptedException {
+   /**
+* Executed as a mail in the mailbox thread. Output needs to be guarded 
with checkpoint lock (for the time being).
+*
+* @param asyncResult the result to output.
+*/
+   private void output(AsyncResult asyncResult) {
if (asyncResult.isWatermark()) {
-   synchronized (checkpointLock) {
-   AsyncWatermarkResult asyncWatermarkResult = 
asyncResult.asWatermark();
+   AsyncWatermarkResult asyncWatermarkResult = 
asyncResult.asWatermark();
 
 Review comment:
   This action can be executed by a normal mailbox loop (not the one running in 
`AsyncWaitOperator` via `yieldToDownstream()`), that doesn't run letters under 
checkpoint lock (yet).
   
   Also, I think you are losing guarantee, that this operation and 
`streamElementQueue.poll()` happen atomically.
   It may happen, that task checkpoint, then restart and restore would emit 
duplicate events.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-19 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315126159
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, 
StreamConfig config, OutputThis method should be called whenever an operator would need to 
block the task thread to wait for output
 
 Review comment:
   Suggestion: use new (empty) line before `` and remove the closing 
``s. This "convention" is more common in the Flink codebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314741672
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +278,74 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(int priority) {
+   return new DownstreamMailbox(priority);
+   }
+
+   class DownstreamMailbox implements Mailbox {
+   private final int priority;
+
+   DownstreamMailbox(int priority) {
+   this.priority = priority;
+   }
+
+   @Override
+   public boolean hasMail() {
+   lock.lock();
+   try {
+   for (Mail mail : queue) {
+   if (mail.getOperatorIndex() >= 
priority) {
+   return true;
+   }
+   }
+   return false;
+   } finally {
+   lock.unlock();
+   }
+   }
+
+   @Override
+   public Optional tryTakeMail() throws 
MailboxStateException {
+   return tryTakeDownstreamMail(priority);
+   }
+
+   @Nonnull
+   @Override
+   public Runnable takeMail() throws InterruptedException, 
MailboxStateException {
+   return takeDownstreamMail(priority);
+   }
+
+   @Override
+   public void putMail(@Nonnull Runnable letter) throws 
MailboxStateException {
+   TaskMailboxImpl.this.putMail(letter, priority);
+   }
+
+   @Override
+   public void putFirst(@Nonnull Runnable priorityLetter) throws 
MailboxStateException {
+   TaskMailboxImpl.this.putFirst(priorityLetter, priority);
+   }
+   }
+
+   /**
+* An executable bound to a specific operator in the chain, such that 
it can be picked for downstream mailbox.
+*/
+   static class Mail {
+   private final Runnable runnable;
+   private final int operatorIndex;
 
 Review comment:
   Rename the field (and the corresponding method) to `priority`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314741224
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +278,74 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(int priority) {
+   return new DownstreamMailbox(priority);
 
 Review comment:
   Suggestion:
   * make the `-1` constant (in `private DownstreamMailbox downstreamMailbox = 
new DownstreamMailbox(-1);`) a named `static final` field (for example 
`MIN_PRIORITY`);
   * and add validation check here that `checkArgument(priority > 
MIN_PRIORITY)`.
   
   Similarly, you can also add a `MAX_PRIORITY` constant (for example, it can 
be used for task cancellation letter) and additionally check here, that 
operator's priority is `< MAX_PRIORITY`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-16 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314742243
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -923,6 +919,17 @@ public String toString() {
return getName();
}
 
+   /**
+* Returns a new view on the MailboxExecutor for the given priority.
+* A h
 
 Review comment:
   Unfinished sentence...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314291085
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ##
 @@ -227,11 +230,30 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output

[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314286078
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 ##
 @@ -251,4 +280,79 @@ public void quiesce() {
public State getState() {
return state;
}
+
+   @Override
+   public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) 
{
+   return 
getDownstreamMailbox(operator.getOperatorConfig().getChainIndex());
 
 Review comment:
   Suggestion: implementation wise, only the 
`AbstractStreamOperator.getOperatorConfig().getChainIndex()` is used here.
   
   You can decouple `TaskMailboxImpl` from `AbstractStreamOperator` at all, so 
the method becomes `Mailbox getDownstreamMailbox(int priority)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314286250
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxExecutor.java
 ##
 @@ -37,7 +37,6 @@
 * @throws RejectedExecutionException if this task cannot be accepted 
for execution, e.g. because the mailbox is
 *quiesced or closed.
 */
-   @Override
 
 Review comment:
   Revert back?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314280325
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
pendingStreamElementQueueEntry = streamElementQueueEntry;
 
while (!queue.tryPut(streamElementQueueEntry)) {
+   yieldToDownstream();
 
 Review comment:
   What do you think about having separate PRs for introducing 
`yieldToDownstream()` implementation and it's usage in `AyncWaitOperator`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops

2019-08-15 Thread GitBox
1u0 commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding 
processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r314213371
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 ##
 @@ -1,73 +1,11 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 
 Review comment:
   @AHeise, fyi, lack of license would fail fast CI build.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services