[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328576174
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,120 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
+
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   if (exception != null) {
-   throw exception;
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328572264
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,120 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
+
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   if (exception != null) {
-   throw exception;
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328571467
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +252,120 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
+
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   if (exception != null) {
-   throw exception;
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328544533
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
 ##
 @@ -18,174 +18,108 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
 
 /**
  * {@link UnorderedStreamElementQueue} specific tests.
  */
 public class UnorderedStreamElementQueueTest extends TestLogger {
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only elements before the oldest watermark are returned if 
they are completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-
-   final UnorderedStreamElementQueue queue = new 
UnorderedStreamElementQueue(8, executor, operatorActions);
-
-   StreamRecordQueueEntry record1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry record2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry record3 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-   StreamRecordQueueEntry record4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
-   WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new 
Watermark(5L));
-   StreamRecordQueueEntry record5 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
-   StreamRecordQueueEntry record6 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
-
-   List> entries = 
Arrays.asList(record1, record2, watermark1, record3,
-   record4, watermark2, record5, record6);
-
-   // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
-   for (StreamElementQueueEntry entry : entries) {
-   queue.put(entry);
-   }
-
-   Assert.assertTrue(8 == queue.size());
-
-   CompletableFuture firstPoll = 
CompletableFuture.supplyAsync(
-   () -> {
-   try {
-   return queue.poll();
-   } catch (InterruptedException e) {
-   throw new CompletionException(e);
-   }
-   },
-   executor);
-
-   // this should not fulfill the poll, because R3 is behind W1
-   record3.complete(Collections.emptyList());
+   public void testCompletionOrder() {
+   final UnorderedStreamElementQueue queue = new 
UnorderedStreamElementQueue<>(8);
 
-   Thread.sleep(10L);
+   ResultFuture record1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture record2 = 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328422187
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
 
 Review comment:
   Yes. Elements within a stage could overtake each other. One more thing for 
further confirmation: how to guarantee that two stages can not overtake each 
other? 
   
   e.g. assuming we have four stages : {a, b, c} {watermark1} {d, e, f} 
{watermark2}, and the first stage is the {a, b, c}. If the user function 
completes the element `e` in third stage firstly, that means the collection of 
`e` would be emitted to output before the first stage? Or I missed something 
else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328541881
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
 ##
 @@ -18,174 +18,108 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
 
 /**
  * {@link UnorderedStreamElementQueue} specific tests.
  */
 public class UnorderedStreamElementQueueTest extends TestLogger {
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only elements before the oldest watermark are returned if 
they are completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-
-   final UnorderedStreamElementQueue queue = new 
UnorderedStreamElementQueue(8, executor, operatorActions);
-
-   StreamRecordQueueEntry record1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry record2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry record3 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-   StreamRecordQueueEntry record4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
-   WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new 
Watermark(5L));
-   StreamRecordQueueEntry record5 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
-   StreamRecordQueueEntry record6 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
-
-   List> entries = 
Arrays.asList(record1, record2, watermark1, record3,
-   record4, watermark2, record5, record6);
-
-   // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
-   for (StreamElementQueueEntry entry : entries) {
-   queue.put(entry);
-   }
-
-   Assert.assertTrue(8 == queue.size());
-
-   CompletableFuture firstPoll = 
CompletableFuture.supplyAsync(
-   () -> {
-   try {
-   return queue.poll();
-   } catch (InterruptedException e) {
-   throw new CompletionException(e);
-   }
-   },
-   executor);
-
-   // this should not fulfill the poll, because R3 is behind W1
-   record3.complete(Collections.emptyList());
+   public void testCompletionOrder() {
+   final UnorderedStreamElementQueue queue = new 
UnorderedStreamElementQueue<>(8);
 
-   Thread.sleep(10L);
+   ResultFuture record1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture record2 = 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328538591
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
 ##
 @@ -18,253 +18,117 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType;
-import static 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putUnsucessfully;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the basic functionality of {@link StreamElementQueue}. The basic 
operations consist
  * of putting and polling elements from the queue.
  */
 @RunWith(Parameterized.class)
 public class StreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
-   enum StreamElementQueueType {
-   OrderedStreamElementQueueType,
-   UnorderedStreamElementQueueType
-   }
-
@Parameterized.Parameters
-   public static Collection 
streamElementQueueTypes() {
-   return Arrays.asList(OrderedStreamElementQueueType, 
UnorderedStreamElementQueueType);
+   public static Collection 
streamElementQueueTypes() {
 
 Review comment:
   streamElementQueueTypes -> outputModes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328538575
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
 ##
 @@ -18,253 +18,117 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType;
-import static 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putUnsucessfully;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the basic functionality of {@link StreamElementQueue}. The basic 
operations consist
  * of putting and polling elements from the queue.
  */
 @RunWith(Parameterized.class)
 public class StreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
-   enum StreamElementQueueType {
-   OrderedStreamElementQueueType,
-   UnorderedStreamElementQueueType
-   }
-
@Parameterized.Parameters
-   public static Collection 
streamElementQueueTypes() {
-   return Arrays.asList(OrderedStreamElementQueueType, 
UnorderedStreamElementQueueType);
+   public static Collection 
streamElementQueueTypes() {
+   return Arrays.asList(AsyncDataStream.OutputMode.ORDERED, 
AsyncDataStream.OutputMode.UNORDERED);
}
 
-   private final StreamElementQueueType streamElementQueueType;
+   private final AsyncDataStream.OutputMode outputMode;
 
-   public StreamElementQueueTest(StreamElementQueueType 
streamElementQueueType) {
-   this.streamElementQueueType = 
Preconditions.checkNotNull(streamElementQueueType);
+   public StreamElementQueueTest(AsyncDataStream.OutputMode outputMode) {
+   this.outputMode = Preconditions.checkNotNull(outputMode);
}
 
-   public StreamElementQueue createStreamElementQueue(int capacity, 
OperatorActions operatorActions) {
-   switch (streamElementQueueType) {
-   case OrderedStreamElementQueueType:
-   return new OrderedStreamElementQueue(capacity, 
executor, operatorActions);
-   case UnorderedStreamElementQueueType:
-   return new 
UnorderedStreamElementQueue(capacity, executor, operatorActions);
+   public StreamElementQueue createStreamElementQueue(int 
capacity) {
 
 Review comment:
   public -> private


This is an automated message from the Apache 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328526479
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 /**
  * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only the head element is pulled from the ordered queue if 
it has been
 * completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue(4, executor, operatorActions);
-
-   StreamRecordQueueEntry entry1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry entry2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry entry4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
-   List> expected = 
Arrays.asList(entry1, entry2, entry3, entry4);
-
-   for (StreamElementQueueEntry entry : expected) {
-   queue.put(entry);
-   }
+   public void testCompletionOrder() {
+   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue<>(4);
 
-   CompletableFuture> pollOperation = 
CompletableFuture.supplyAsync(
-   () -> {
-   List result = new ArrayList<>(4);
-   while (!queue.isEmpty()) {
-   try {
-   result.add(queue.poll());
-   } catch (InterruptedException e) {
-   throw new 
CompletionException(e);
-   }
-   }
-
-   return result;
-   },
-   executor);
-
-   Thread.sleep(10L);
-
-   Assert.assertFalse(pollOperation.isDone());
-
-   entry2.complete(Collections.emptyList());
-
-   entry4.complete(Collections.emptyList());
-
-   Thread.sleep(10L);
+   ResultFuture entry1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture entry2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+   putSucessfully(queue, new Watermark(2L));
+   ResultFuture entry4 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
 
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328525293
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 /**
  * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only the head element is pulled from the ordered queue if 
it has been
 * completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue(4, executor, operatorActions);
-
-   StreamRecordQueueEntry entry1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry entry2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry entry4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
-   List> expected = 
Arrays.asList(entry1, entry2, entry3, entry4);
-
-   for (StreamElementQueueEntry entry : expected) {
-   queue.put(entry);
-   }
+   public void testCompletionOrder() {
+   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue<>(4);
 
-   CompletableFuture> pollOperation = 
CompletableFuture.supplyAsync(
-   () -> {
-   List result = new ArrayList<>(4);
-   while (!queue.isEmpty()) {
-   try {
-   result.add(queue.poll());
-   } catch (InterruptedException e) {
-   throw new 
CompletionException(e);
-   }
-   }
-
-   return result;
-   },
-   executor);
-
-   Thread.sleep(10L);
-
-   Assert.assertFalse(pollOperation.isDone());
-
-   entry2.complete(Collections.emptyList());
-
-   entry4.complete(Collections.emptyList());
-
-   Thread.sleep(10L);
+   ResultFuture entry1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture entry2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+   putSucessfully(queue, new Watermark(2L));
+   ResultFuture entry4 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
 
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328525357
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 /**
  * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only the head element is pulled from the ordered queue if 
it has been
 * completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue(4, executor, operatorActions);
-
-   StreamRecordQueueEntry entry1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry entry2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry entry4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
-   List> expected = 
Arrays.asList(entry1, entry2, entry3, entry4);
-
-   for (StreamElementQueueEntry entry : expected) {
-   queue.put(entry);
-   }
+   public void testCompletionOrder() {
+   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue<>(4);
 
-   CompletableFuture> pollOperation = 
CompletableFuture.supplyAsync(
-   () -> {
-   List result = new ArrayList<>(4);
-   while (!queue.isEmpty()) {
-   try {
-   result.add(queue.poll());
-   } catch (InterruptedException e) {
-   throw new 
CompletionException(e);
-   }
-   }
-
-   return result;
-   },
-   executor);
-
-   Thread.sleep(10L);
-
-   Assert.assertFalse(pollOperation.isDone());
-
-   entry2.complete(Collections.emptyList());
-
-   entry4.complete(Collections.emptyList());
-
-   Thread.sleep(10L);
+   ResultFuture entry1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture entry2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+   putSucessfully(queue, new Watermark(2L));
+   ResultFuture entry4 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
 
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328525320
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 /**
  * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
-
-   private static final long timeout = 1L;
-   private static ExecutorService executor;
-
-   @BeforeClass
-   public static void setup() {
-   executor = Executors.newFixedThreadPool(3);
-   }
-
-   @AfterClass
-   public static void shutdown() {
-   executor.shutdown();
-
-   try {
-   if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-   executor.shutdownNow();
-   }
-   } catch (InterruptedException interrupted) {
-   executor.shutdownNow();
-
-   Thread.currentThread().interrupt();
-   }
-   }
-
/**
 * Tests that only the head element is pulled from the ordered queue if 
it has been
 * completed.
 */
@Test
-   public void testCompletionOrder() throws Exception {
-   OperatorActions operatorActions = mock(OperatorActions.class);
-   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue(4, executor, operatorActions);
-
-   StreamRecordQueueEntry entry1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-   StreamRecordQueueEntry entry2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-   WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new 
Watermark(2L));
-   StreamRecordQueueEntry entry4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
-   List> expected = 
Arrays.asList(entry1, entry2, entry3, entry4);
-
-   for (StreamElementQueueEntry entry : expected) {
-   queue.put(entry);
-   }
+   public void testCompletionOrder() {
+   final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue<>(4);
 
-   CompletableFuture> pollOperation = 
CompletableFuture.supplyAsync(
-   () -> {
-   List result = new ArrayList<>(4);
-   while (!queue.isEmpty()) {
-   try {
-   result.add(queue.poll());
-   } catch (InterruptedException e) {
-   throw new 
CompletionException(e);
-   }
-   }
-
-   return result;
-   },
-   executor);
-
-   Thread.sleep(10L);
-
-   Assert.assertFalse(pollOperation.isDone());
-
-   entry2.complete(Collections.emptyList());
-
-   entry4.complete(Collections.emptyList());
-
-   Thread.sleep(10L);
+   ResultFuture entry1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+   ResultFuture entry2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+   putSucessfully(queue, new Watermark(2L));
+   ResultFuture entry4 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
 
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328521088
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for stream element queues for the {@link AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Tries to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompletedElements()} before entering
+* any critical section).
+*
+* @param output the output into which to emit
 */
-   AsyncResult peekBlockingly() throws InterruptedException;
+   void emitCompletedElement(TimestampedCollector output);
 
/**
-* Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-* blocking and only returns once a completed async result has been 
found.
+* Checks if there is at least one completed element.
 
 Review comment:
   This description is not very accurate for the ordered case. For order case, 
it must be the head element completed in queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328471027
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertTrue(resultFuture.isPresent());
+   return resultFuture.get();
+   }
+
+   static void putUnsucessfully(StreamElementQueue queue, 
StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertFalse(resultFuture.isPresent());
+   }
+
+   /**
+* Pops all completed elements from the head of this queue.
+*
+* @return Completed elements or empty list of none exists.
+*/
+   static List popCompleted(StreamElementQueue 
queue) {
+   final List completed = new ArrayList<>();
+   TimestampedCollector collector = new 
TimestampedCollector<>(new CollectorOutput<>(completed));
+   while (queue.hasCompletedElements()) {
+   queue.emitCompletedElement(collector);
+   }
+   collector.close();
 
 Review comment:
   ATM the `close` actually did nothing, but the proposed semantic from 
`Collector#close` would flush buffered data. If so I wonder it might have 
potential risk to call `close` before return if the `close` implementation is 
changed to clear the list future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328471027
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertTrue(resultFuture.isPresent());
+   return resultFuture.get();
+   }
+
+   static void putUnsucessfully(StreamElementQueue queue, 
StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertFalse(resultFuture.isPresent());
+   }
+
+   /**
+* Pops all completed elements from the head of this queue.
+*
+* @return Completed elements or empty list of none exists.
+*/
+   static List popCompleted(StreamElementQueue 
queue) {
+   final List completed = new ArrayList<>();
+   TimestampedCollector collector = new 
TimestampedCollector<>(new CollectorOutput<>(completed));
+   while (queue.hasCompletedElements()) {
+   queue.emitCompletedElement(collector);
+   }
+   collector.close();
 
 Review comment:
   ATM the `close` actually did nothing, but the proposed semantic from 
`Collector#close` would flush buffered data. If so I wonder it might have 
potential risk to call `close` before return it if the `close` implementation 
is changed to clear the list future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328471027
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertTrue(resultFuture.isPresent());
+   return resultFuture.get();
+   }
+
+   static void putUnsucessfully(StreamElementQueue queue, 
StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertFalse(resultFuture.isPresent());
+   }
+
+   /**
+* Pops all completed elements from the head of this queue.
+*
+* @return Completed elements or empty list of none exists.
+*/
+   static List popCompleted(StreamElementQueue 
queue) {
+   final List completed = new ArrayList<>();
+   TimestampedCollector collector = new 
TimestampedCollector<>(new CollectorOutput<>(completed));
+   while (queue.hasCompletedElements()) {
+   queue.emitCompletedElement(collector);
+   }
+   collector.close();
 
 Review comment:
   ATM the `close` actually did nothing, but the proposed semantic from 
`Collector#close` would flush buffered data. If so I wonder it might have 
potential risk to clear the `completed` list before return if the 
implementation is changed future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328466285
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertTrue(resultFuture.isPresent());
+   return resultFuture.get();
+   }
+
+   static void putUnsucessfully(StreamElementQueue queue, 
StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertFalse(resultFuture.isPresent());
+   }
+
+   /**
+* Pops all completed elements from the head of this queue.
+*
+* @return Completed elements or empty list of none exists.
 
 Review comment:
   `empty list if none exists`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328464439
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
+   Optional> resultFuture = 
queue.tryPut(streamElement);
+   assertTrue(resultFuture.isPresent());
+   return resultFuture.get();
+   }
+
+   static void putUnsucessfully(StreamElementQueue queue, 
StreamElement streamElement) {
 
 Review comment:
   typo: sucess


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328464339
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility for putting elements inside a {@link StreamElementQueue}.
+ */
+class QueueUtil {
+   static ResultFuture putSucessfully(StreamElementQueue 
queue, StreamElement streamElement) {
 
 Review comment:
   typo: Sucess -> Success


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328463014
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 Review comment:
   remove extra empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328462756
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ##
 @@ -100,6 +102,9 @@
 public class AsyncWaitOperatorTest extends TestLogger {
private static final long TIMEOUT = 1000L;
 
+   @Rule
+   public Timeout timeoutRule = new Timeout(10, TimeUnit.SECONDS);
 
 Review comment:
   Why we need this change? 
   Some previous tests were configured separately as 2 seconds, but now we 
unify it for 10 seconds for all the cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328459063
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
 ##
 @@ -71,4 +73,19 @@ public void eraseTimestamp() {
public void close() {
output.close();
}
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+   output.emitWatermark(mark);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord record) 
{
 
 Review comment:
   It is involved in three new methods by implementing `Output` interface 
instead. But actually we only need the `emitWatermark` method, and the left two 
methods would never be touched in current codes. If so, another option is that 
we can only define `emitWatermark` method here and still implement `Collector` 
interface.
   
   Or we could refactor the previous usages of `TimestampedCollector` to reuse 
the other two methods for avoiding touching the internal wrapped `output` 
directly. It might be out of this PR scope or a separate hotfix commit if you 
want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-26 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328459063
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
 ##
 @@ -71,4 +73,19 @@ public void eraseTimestamp() {
public void close() {
output.close();
}
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+   output.emitWatermark(mark);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord record) 
{
 
 Review comment:
   It is involved in three new methods by implementing `Output` interface 
instead. But actually we only need the `emitWatermark` method, and the left two 
methods would never be touched in codes. If so, another option is that we can 
only define `emitWatermark` method here and still implement `Collector` 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328422187
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
 
 Review comment:
   Yes. Elements within a stage could overtake each other. One more thing for 
further confirmation: how to guarantee that two stages can not overtake each 
other? 
   
   e.g. assuming we have four stages : {a, b, c} {watermark1} {d, e, f} 
{watermark2}, and the first stage is the {a, b, c}. If the user function 
completes the element `e` in third stage firstly, that means the collection of 
`e` would be emitted to output before the first stage? Or I missed something 
else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328416725
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 ##
 @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws 
MailboxStateException, I
try {
maybeLetter.get().run();
} catch (Exception e) {
-   e.printStackTrace();
+   throw new IllegalStateException("Cannot process 
mail " + maybeLetter.get(), e);
 
 Review comment:
   Actually I concerned two issues:
   
   - `IllegalStateException` seems not very accurate here.
   
   - If we want to provide more message for debugging, maybe we could use 
`ExceptionUtils.rethrow(e, "Cannot process mail " + maybeLetter.get());`. But I 
am not sure whether `maybeLetter.get()` could give any helpful info here. If 
the debug info is useless, it seems simple to remove `try...catch` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328416725
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 ##
 @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws 
MailboxStateException, I
try {
maybeLetter.get().run();
} catch (Exception e) {
-   e.printStackTrace();
+   throw new IllegalStateException("Cannot process 
mail " + maybeLetter.get(), e);
 
 Review comment:
   Actually I concerned two issues here:
   
   - `IllegalStateException` seems not very accurate here.
   
   - If we want to provide more message for debugging, maybe we could use 
`ExceptionUtils.rethrow(e, "Cannot process mail " + maybeLetter.get());`. But I 
am not sure whether `maybeLetter.get()` could give any helpful info here. If 
the debug info is useless, it seems simple to remove `try...catch` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328414259
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 ##
 @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws 
MailboxStateException, I
try {
maybeLetter.get().run();
} catch (Exception e) {
-   e.printStackTrace();
+   throw new IllegalStateException("Cannot process 
mail " + maybeLetter.get(), e);
 
 Review comment:
   From the description of `AsyncExceptionHandler`, it is proposed to used by 
other threads except the main task thread. AFAIK it is only used by 
`SplitReader` in `ContinuousFileReaderOperator` now. For this case if the 
`processMail` is executed in the main task thread, maybe it is out of scope of 
`AsyncExceptionHandler`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328069833
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java
 ##
 @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws 
MailboxStateException, I
try {
maybeLetter.get().run();
} catch (Exception e) {
-   e.printStackTrace();
+   throw new IllegalStateException("Cannot process 
mail " + maybeLetter.get(), e);
 
 Review comment:
   If we want to interrupt the execution for any exceptions during looping the 
letters, I am wondering whether this wrapped exception provides any helpful 
debug information. If not, we could remove `try..catch` clauses here for simple.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328066958
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
-   if (bufferEntry.isDone()) {
-   
completedQueue.offer(bufferEntry);
-   it.remove();
-   }
-   }
-   }
+   Stage(int initialCapacity) {
+   completedQueue = new ArrayDeque<>(initialCapacity);
+   pendingElements = new HashSet<>(initialCapacity);
+   }
 
-   LOG.debug("Signal unordered stream element 
queue has completed entries.");
-   hasCompletedEntries.signalAll();
+   /**
+* Signal that an entry finished computation.
+*/
+   void completed(StreamElementQueueEntry elementQueueEntry) {
+   // adding only to completed queue if not completed 
before
+   // there may be a real result coming after the actual 
result, which is updated in the queue entry but
+   // the entry is not readded to the complete queue
+   if (pendingElements.remove(elementQueueEntry)) {
+   completedQueue.add(elementQueueEntry);
 
 Review comment:
   In the past, after handling the current notified completed entry, it would 
fetch the next set and iterate to check whether there are completed entries in 
the next set. I do not know why it needs to do that before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328064864
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
 ##
 @@ -19,31 +19,44 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
 
-import java.util.concurrent.CompletableFuture;
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
 
 /**
  * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
  */
 @Internal
-public class WatermarkQueueEntry extends StreamElementQueueEntry 
implements AsyncWatermarkResult {
+class WatermarkQueueEntry implements StreamElementQueueEntry {
+   @Nonnull
+   private final Watermark watermark;
 
-   private final CompletableFuture future;
+   WatermarkQueueEntry(@Nonnull Watermark watermark) {
 
 Review comment:
   ditto: `@Nonnull` issue.
   I guess it is the common sense for `Nonnull`. Then it is only necessary to 
tag it if `Nullable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328060993
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
-   if (bufferEntry.isDone()) {
-   
completedQueue.offer(bufferEntry);
-   it.remove();
-   }
-   }
-   }
+   Stage(int initialCapacity) {
+   completedQueue = new ArrayDeque<>(initialCapacity);
+   pendingElements = new HashSet<>(initialCapacity);
+   }
 
-   LOG.debug("Signal unordered stream element 
queue has completed entries.");
-   hasCompletedEntries.signalAll();
+   /**
+* Signal that an entry finished computation.
+*/
+   void completed(StreamElementQueueEntry elementQueueEntry) {
+   // adding only to completed queue if not completed 
before
+   // there may be a real result coming after the actual 
result, which is updated in the queue entry but
+   // the entry is not readded to the complete queue
+   if (pendingElements.remove(elementQueueEntry)) {
+   completedQueue.add(elementQueueEntry);
}
-   } finally {
-   lock.unlock();
}
-   }
 
-   /**
-* Add the given stream element queue entry to the current last set if 
it is not a watermark.
-* If it is a watermark, then stop adding to the current last set, 
insert the watermark into its
-* own set and add a new last set.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
+   /**
+* 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328059924
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
-   if (bufferEntry.isDone()) {
-   
completedQueue.offer(bufferEntry);
-   it.remove();
-   }
-   }
-   }
+   Stage(int initialCapacity) {
+   completedQueue = new ArrayDeque<>(initialCapacity);
+   pendingElements = new HashSet<>(initialCapacity);
+   }
 
-   LOG.debug("Signal unordered stream element 
queue has completed entries.");
-   hasCompletedEntries.signalAll();
+   /**
+* Signal that an entry finished computation.
 
 Review comment:
   nit: Signal -> Signals


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328059856
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
 
 Review comment:
   Remove `Finished` for the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328059752
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
 Review comment:
   I think it is better to rename `uncompletedElements` instead, because the 
semantic `pending` in below `addPendingElements` covers both 
completed/uncompleted elements. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328056484
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
 
 Review comment:
   The previous descriptions for order/unorder issues are not very clearly in 
my thought.  Especially for `no stream record can be overtaken by a watermark 
and no watermark can overtake a stream record`, the sayings before/after `and` 
are the same.
   I think it is better to involve in the stage concept for better 
understanding. If I understood correctly, elements in one stage are kept the 
order, and elements in different stages are not in order which means overtaking 
each other.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327964157
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
 Review comment:
   numberEntries -> numberOfPendingEntries?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327976682
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
+   public UnorderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.uncompletedQueue = new ArrayDeque<>(capacity);
-   this.completedQueue = new ArrayDeque<>(capacity);
-
-   this.firstSet = new HashSet<>(capacity);
-   this.lastSet = firstSet;
 
+   this.capacity = capacity;
+   // most likely scenario are 4 stages 
+   this.stages = new ArrayDeque<>(4);
this.numberEntries = 0;
-
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327976571
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
+   public UnorderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.uncompletedQueue = new ArrayDeque<>(capacity);
-   this.completedQueue = new ArrayDeque<>(capacity);
-
-   this.firstSet = new HashSet<>(capacity);
-   this.lastSet = firstSet;
 
+   this.capacity = capacity;
+   // most likely scenario are 4 stages 
+   this.stages = new ArrayDeque<>(4);
this.numberEntries = 0;
-
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327976364
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
 Review comment:
   `Deque>`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327976364
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
 Review comment:
   Deque>


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327975397
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
+   public UnorderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.uncompletedQueue = new ArrayDeque<>(capacity);
-   this.completedQueue = new ArrayDeque<>(capacity);
-
-   this.firstSet = new HashSet<>(capacity);
-   this.lastSet = firstSet;
 
+   this.capacity = capacity;
+   // most likely scenario are 4 stages 
+   this.stages = new ArrayDeque<>(4);
this.numberEntries = 0;
-
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327975499
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
+   public UnorderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.uncompletedQueue = new ArrayDeque<>(capacity);
-   this.completedQueue = new ArrayDeque<>(capacity);
-
-   this.firstSet = new HashSet<>(capacity);
-   this.lastSet = firstSet;
 
+   this.capacity = capacity;
+   // most likely scenario are 4 stages 
+   this.stages = new ArrayDeque<>(4);
this.numberEntries = 0;
-
- 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327974424
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
-   if (bufferEntry.isDone()) {
-   
completedQueue.offer(bufferEntry);
-   it.remove();
-   }
-   }
-   }
+   Stage(int initialCapacity) {
+   completedQueue = new ArrayDeque<>(initialCapacity);
+   pendingElements = new HashSet<>(initialCapacity);
+   }
 
-   LOG.debug("Signal unordered stream element 
queue has completed entries.");
-   hasCompletedEntries.signalAll();
+   /**
+* Signal that an entry finished computation.
+*/
+   void completed(StreamElementQueueEntry elementQueueEntry) {
+   // adding only to completed queue if not completed 
before
+   // there may be a real result coming after the actual 
result, which is updated in the queue entry but
+   // the entry is not readded to the complete queue
+   if (pendingElements.remove(elementQueueEntry)) {
+   completedQueue.add(elementQueueEntry);
}
-   } finally {
-   lock.unlock();
}
-   }
 
-   /**
-* Add the given stream element queue entry to the current last set if 
it is not a watermark.
-* If it is a watermark, then stop adding to the current last set, 
insert the watermark into its
-* own set and add a new last set.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
+   /**
+* 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327974182
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
-   while (it.hasNext()) {
-   StreamElementQueueEntry 
bufferEntry = it.next();
+   /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+   private final Set> pendingElements;
 
-   if (bufferEntry.isDone()) {
-   
completedQueue.offer(bufferEntry);
-   it.remove();
-   }
-   }
-   }
+   Stage(int initialCapacity) {
+   completedQueue = new ArrayDeque<>(initialCapacity);
+   pendingElements = new HashSet<>(initialCapacity);
+   }
 
-   LOG.debug("Signal unordered stream element 
queue has completed entries.");
-   hasCompletedEntries.signalAll();
+   /**
+* Signal that an entry finished computation.
+*/
+   void completed(StreamElementQueueEntry elementQueueEntry) {
+   // adding only to completed queue if not completed 
before
+   // there may be a real result coming after the actual 
result, which is updated in the queue entry but
+   // the entry is not readded to the complete queue
 
 Review comment:
   typo: readded


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327973730
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
 Review comment:
   `Stage`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327973356
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
 
 Review comment:
   package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327973260
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
 
 Review comment:
   remove `this`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327970135
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -218,88 +172,99 @@ public int size() {
}
 
/**
-* Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-* entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-* case, then the element is added to the completed entries queue from 
where it can be consumed.
-* If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-* queue. Completed entries from this new set are then added to the 
completed entries queue.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread has been 
interrupted while performing the
-*  on complete callback.
+* An entry that notifies the respective stage upon completion.
 */
-   public void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
+   static class StagedStreamRecordQueueEntry extends 
StreamRecordQueueEntry {
+   private final Stage stage;
 
-   try {
-   if (firstSet.remove(streamElementQueueEntry)) {
-   completedQueue.offer(streamElementQueueEntry);
+   public StagedStreamRecordQueueEntry(StreamRecord 
inputRecord, Stage stage) {
+   super(inputRecord);
+   this.stage = stage;
+   }
 
-   while (firstSet.isEmpty() && firstSet != 
lastSet) {
-   firstSet = uncompletedQueue.poll();
+   @Override
+   public void complete(Collection result) {
+   super.complete(result);
+   this.stage.completed(this);
+   }
+   }
 
-   Iterator> it 
= firstSet.iterator();
+   /**
+* A stage is a collection of queue entries that can be completed in 
arbitrary order.
+*/
+   static class Stage {
+   /** Undrained finished elements. */
+   private final Queue> 
completedQueue;
 
 Review comment:
   completedQueue -> completedElements


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327964157
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##
 @@ -19,192 +19,146 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue
- * emits asynchronous results as soon as they are completed. Additionally it 
maintains the
- * watermark-stream record order. This means that no stream record can be 
overtaken by a watermark
- * and no watermark can overtake a stream record. However, stream records 
falling in the same
- * segment between two watermarks can overtake each other (their emission 
order is not guaranteed).
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered 
stream element queue provides
+ * asynchronous results as soon as they are completed. Additionally it 
maintains the watermark-stream record order.
+ * This means that no stream record can be overtaken by a watermark and no 
watermark can overtake a stream record.
+ * However, stream records falling in the same segment between two watermarks 
can overtake each other (their emission
+ * order is not guaranteed).
  */
 @Internal
-public class UnorderedStreamElementQueue implements StreamElementQueue {
+public final class UnorderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onComplete callbacks. */
-   private final Executor executor;
+   /** Queue of queue entries segmented by watermarks. */
+   private final Deque stages;
 
-   /** OperatorActions to signal the owning operator a failure. */
-   private final OperatorActions operatorActions;
-
-   /** Queue of uncompleted stream element queue entries segmented by 
watermarks. */
-   private final ArrayDeque>> 
uncompletedQueue;
-
-   /** Queue of completed stream element queue entries. */
-   private final ArrayDeque> completedQueue;
-
-   /** First (chronologically oldest) uncompleted set of stream element 
queue entries. */
-   private Set> firstSet;
-
-   // Last (chronologically youngest) uncompleted set of stream element 
queue entries. New
-   // stream element queue entries are inserted into this set.
-   private Set> lastSet;
-   private volatile int numberEntries;
-
-   /** Locks and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition hasCompletedEntries;
-
-   public UnorderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private int numberEntries;
 
 Review comment:
   numberEntries -> numberOfEntries?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327962020
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
 ##
 @@ -21,65 +21,55 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. 
This class also acts
  * as the {@link ResultFuture} implementation which is given to the {@link 
AsyncFunction}. The
  * async function completes this class with a collection of results.
  *
- * @param  Type of the asynchronous collection result
+ * @param  Type of the asynchronous collection result.
  */
 @Internal
-public class StreamRecordQueueEntry extends 
StreamElementQueueEntry>
-   implements AsyncCollectionResult, ResultFuture {
-
-   /** Timestamp information. */
-   private final boolean hasTimestamp;
-   private final long timestamp;
-
-   /** Future containing the collection result. */
-   private final CompletableFuture> resultFuture;
+class StreamRecordQueueEntry implements StreamElementQueueEntry {
+   private Collection completed;
 
-   public StreamRecordQueueEntry(StreamRecord streamRecord) {
-   super(streamRecord);
+   @Nonnull
+   private final StreamRecord inputRecord;
 
-   hasTimestamp = streamRecord.hasTimestamp();
-   timestamp = streamRecord.getTimestamp();
-
-   resultFuture = new CompletableFuture<>();
+   StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) {
+   this.inputRecord = Preconditions.checkNotNull(inputRecord);
}
 
@Override
-   public boolean hasTimestamp() {
-   return hasTimestamp;
+   public boolean isDone() {
+   return completed != null;
}
 
+   @Nonnull
@Override
-   public long getTimestamp() {
-   return timestamp;
+   public StreamRecord getInputElement() {
+   return inputRecord;
}
 
@Override
-   public Collection get() throws Exception {
-   return resultFuture.get();
-   }
+   public void emitResult(TimestampedCollector output) {
+   Preconditions.checkState(completed != null, "Not done yet");
 
-   @Override
-   protected CompletableFuture> getFuture() {
-   return resultFuture;
+   output.setTimestamp(this.inputRecord);
+   for (OUT r : completed) {
+   output.collect(r);
+   }
}
 
@Override
public void complete(Collection result) {
-   resultFuture.complete(result);
-   }
-
-   @Override
-   public void completeExceptionally(Throwable error) {
-   resultFuture.completeExceptionally(error);
+   this.completed = result;
 
 Review comment:
   ditto: `this`. Do we need to `checkNotNull(result)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327962005
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
 ##
 @@ -21,65 +21,55 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. 
This class also acts
  * as the {@link ResultFuture} implementation which is given to the {@link 
AsyncFunction}. The
  * async function completes this class with a collection of results.
  *
- * @param  Type of the asynchronous collection result
+ * @param  Type of the asynchronous collection result.
  */
 @Internal
-public class StreamRecordQueueEntry extends 
StreamElementQueueEntry>
-   implements AsyncCollectionResult, ResultFuture {
-
-   /** Timestamp information. */
-   private final boolean hasTimestamp;
-   private final long timestamp;
-
-   /** Future containing the collection result. */
-   private final CompletableFuture> resultFuture;
+class StreamRecordQueueEntry implements StreamElementQueueEntry {
+   private Collection completed;
 
-   public StreamRecordQueueEntry(StreamRecord streamRecord) {
-   super(streamRecord);
+   @Nonnull
+   private final StreamRecord inputRecord;
 
-   hasTimestamp = streamRecord.hasTimestamp();
-   timestamp = streamRecord.getTimestamp();
-
-   resultFuture = new CompletableFuture<>();
+   StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) {
+   this.inputRecord = Preconditions.checkNotNull(inputRecord);
}
 
@Override
-   public boolean hasTimestamp() {
-   return hasTimestamp;
+   public boolean isDone() {
+   return completed != null;
}
 
+   @Nonnull
@Override
-   public long getTimestamp() {
-   return timestamp;
+   public StreamRecord getInputElement() {
+   return inputRecord;
}
 
@Override
-   public Collection get() throws Exception {
-   return resultFuture.get();
-   }
+   public void emitResult(TimestampedCollector output) {
+   Preconditions.checkState(completed != null, "Not done yet");
 
-   @Override
-   protected CompletableFuture> getFuture() {
-   return resultFuture;
+   output.setTimestamp(this.inputRecord);
 
 Review comment:
   Remove `this`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327961954
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
 ##
 @@ -21,65 +21,55 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. 
This class also acts
  * as the {@link ResultFuture} implementation which is given to the {@link 
AsyncFunction}. The
  * async function completes this class with a collection of results.
  *
- * @param  Type of the asynchronous collection result
+ * @param  Type of the asynchronous collection result.
  */
 @Internal
-public class StreamRecordQueueEntry extends 
StreamElementQueueEntry>
-   implements AsyncCollectionResult, ResultFuture {
-
-   /** Timestamp information. */
-   private final boolean hasTimestamp;
-   private final long timestamp;
-
-   /** Future containing the collection result. */
-   private final CompletableFuture> resultFuture;
+class StreamRecordQueueEntry implements StreamElementQueueEntry {
+   private Collection completed;
 
 Review comment:
   It is better to give a more descriptive naming for `completed` for better 
understanding in following relevant processes. 
`completedOutputs/completedResults`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327958374
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
 ##
 @@ -21,65 +21,55 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. 
This class also acts
  * as the {@link ResultFuture} implementation which is given to the {@link 
AsyncFunction}. The
  * async function completes this class with a collection of results.
  *
- * @param  Type of the asynchronous collection result
+ * @param  Type of the asynchronous collection result.
  */
 @Internal
-public class StreamRecordQueueEntry extends 
StreamElementQueueEntry>
-   implements AsyncCollectionResult, ResultFuture {
-
-   /** Timestamp information. */
-   private final boolean hasTimestamp;
-   private final long timestamp;
-
-   /** Future containing the collection result. */
-   private final CompletableFuture> resultFuture;
+class StreamRecordQueueEntry implements StreamElementQueueEntry {
+   private Collection completed;
 
-   public StreamRecordQueueEntry(StreamRecord streamRecord) {
-   super(streamRecord);
+   @Nonnull
+   private final StreamRecord inputRecord;
 
-   hasTimestamp = streamRecord.hasTimestamp();
-   timestamp = streamRecord.getTimestamp();
-
-   resultFuture = new CompletableFuture<>();
+   StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) {
 
 Review comment:
   We could remove `@Nonnull` because the below `Preconditions.checkNotNull` 
has the same effect.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327957126
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
 ##
 @@ -19,79 +19,47 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.Preconditions;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
+import javax.annotation.Nonnull;
 
 /**
- * Entry class for the {@link StreamElementQueue}. The stream element queue 
entry stores the
- * {@link StreamElement} for which the stream element queue entry has been 
instantiated.
- * Furthermore, it allows to register callbacks for when the queue entry is 
completed.
- *
- * @param  Type of the result
+ * An entry for the {@link StreamElementQueue}. The stream element queue entry 
stores the {@link StreamElement} for
+ * which the stream element queue entry has been instantiated.
+ * Furthermore, it allows to set the result of a completed entry through 
{@link ResultFuture}.
  */
 @Internal
-public abstract class StreamElementQueueEntry implements AsyncResult {
-
-   private final StreamElement streamElement;
-
-   public StreamElementQueueEntry(StreamElement streamElement) {
-   this.streamElement = Preconditions.checkNotNull(streamElement);
-   }
-
-   public StreamElement getStreamElement() {
-   return streamElement;
-   }
+interface StreamElementQueueEntry extends ResultFuture {
 
/**
 * True if the stream element queue entry has been completed; otherwise 
false.
 *
 * @return True if the stream element queue entry has been completed; 
otherwise false.
 */
-   public boolean isDone() {
-   return getFuture().isDone();
-   }
+   boolean isDone();
 
/**
-* Register the given complete function to be called once this queue 
entry has been completed.
+* Emits the results associated with this queue entry.
 *
-* @param completeFunction to call when the queue entry has been 
completed
-* @param executor to run the complete function
+* @param output the output into which to emit.
+* @throws IllegalStateException if this entry is not done yet.
 
 Review comment:
   We can not see this exception explicitly, so might not need this javadoc


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954961
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
 
 Review comment:
   ditto: ``


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327955120
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
+*
+* @param output the output into which to emit
 */
-   AsyncResult peekBlockingly() throws InterruptedException;
+   void emitCompleted(TimestampedCollector output);
 
/**
-* Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-* blocking and only returns once a completed async result has been 
found.
+* Checks if there is at least one completed element to be emitted.
 *
-* @return Completed {@link AsyncResult} which has been removed from 
the queue
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* @return true if there is an element to be emitted.
 */
-   AsyncResult poll() throws InterruptedException;
+   boolean hasCompleted();
 
/**
-* Return the collection of {@link StreamElementQueueEntry} currently 
contained in this queue.
+* Returns the collection of {@link StreamElement} currently contained 
in this queue for checkpointing.
+*
+* This includes all non-emitted, completed and non-completed 
elements.
 
 Review comment:
   ditto: ``


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327955101
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
+*
+* @param output the output into which to emit
 */
-   AsyncResult peekBlockingly() throws InterruptedException;
+   void emitCompleted(TimestampedCollector output);
 
/**
-* Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-* blocking and only returns once a completed async result has been 
found.
+* Checks if there is at least one completed element to be emitted.
 *
-* @return Completed {@link AsyncResult} which has been removed from 
the queue
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* @return true if there is an element to be emitted.
 
 Review comment:
   `true if there is a completed element.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327955120
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
+*
+* @param output the output into which to emit
 */
-   AsyncResult peekBlockingly() throws InterruptedException;
+   void emitCompleted(TimestampedCollector output);
 
/**
-* Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-* blocking and only returns once a completed async result has been 
found.
+* Checks if there is at least one completed element to be emitted.
 *
-* @return Completed {@link AsyncResult} which has been removed from 
the queue
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* @return true if there is an element to be emitted.
 */
-   AsyncResult poll() throws InterruptedException;
+   boolean hasCompleted();
 
/**
-* Return the collection of {@link StreamElementQueueEntry} currently 
contained in this queue.
+* Returns the collection of {@link StreamElement} currently contained 
in this queue for checkpointing.
+*
+* This includes all non-emitted, completed and non-completed 
elements.
 
 Review comment:
   ditto: 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954988
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
+*
+* @param output the output into which to emit
 */
-   AsyncResult peekBlockingly() throws InterruptedException;
+   void emitCompleted(TimestampedCollector output);
 
/**
-* Poll the first completed {@link AsyncResult} from the head of this 
queue. This operation is
-* blocking and only returns once a completed async result has been 
found.
+* Checks if there is at least one completed element to be emitted.
 
 Review comment:
   I think it is better to remove "to be emitted". `hasCompleted()` should not 
know/care of the purpose of upper caller whether to emit or not. In other 
words, if `hasCompleted()` is defined only used for  `emitCompleted`, we could 
judge it implicitly inside `emitCompleted`, no need to define an explicit 
interface method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954961
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 *
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
-* @return True if the entry could be inserted; otherwise false
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
+* @param streamElement the element to be inserted.
+* @return A handle to the element if successful or {@link 
Optional#empty()} otherwise.
 */
-boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException;
+   Optional> tryPut(StreamElement streamElement);
 
/**
-* Peek at the head of the queue and return the first completed {@link 
AsyncResult}. This
-* operation is a blocking operation and only returns once a completed 
async result has been
-* found.
+* Emits one completed element from the head of this queue into the 
given output.
 *
-* @return Completed {@link AsyncResult}
-* @throws InterruptedException if the current thread has been 
interrupted while waiting for a
-*  completed async result.
+* Will not emit any element if no element has been completed (check 
{@link #hasCompleted()} before entering
+* any critical section).
 
 Review comment:
   ditto: 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954939
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
+* This method returns a handle to the inserted element that allows 
to set the result of the computation.
 
 Review comment:
   I guess we do not need the tail  before in code formatting rule. 
Anything changed now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954620
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -19,117 +19,66 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 
 /**
- * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue emits
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue provides
  * asynchronous results in the order in which the {@link 
StreamElementQueueEntry} have been added
  * to the queue. Thus, even if the completion order can be arbitrary, the 
output order strictly
  * follows the insertion order (element cannot overtake each other).
  */
 @Internal
-public class OrderedStreamElementQueue implements StreamElementQueue {
+public final class OrderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(OrderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onCompletion callback. */
-   private final Executor executor;
-
-   /** Operator actions to signal a failure to the operator. */
-   private final OperatorActions operatorActions;
-
-   /** Lock and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition headIsCompleted;
-
/** Queue for the inserted StreamElementQueueEntries. */
-   private final ArrayDeque> queue;
-
-   public OrderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private final Queue> queue;
 
+   public OrderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.lock = new ReentrantLock(false);
-   this.headIsCompleted = lock.newCondition();
-   this.notFull = lock.newCondition();
 
+   this.capacity = capacity;
this.queue = new ArrayDeque<>(capacity);
}
 
@Override
-   public AsyncResult peekBlockingly() throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.isEmpty() || !queue.peek().isDone()) {
-   headIsCompleted.await();
-   }
-
-   LOG.debug("Peeked head element from ordered stream 
element queue with filling degree " +
-   "({}/{}).", queue.size(), capacity);
-
-   return queue.peek();
-   } finally {
-   lock.unlock();
-   }
+   public boolean hasCompleted() {
+   return !queue.isEmpty() && queue.peek().isDone();
}
 
@Override
-   public AsyncResult poll() throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.isEmpty() || !queue.peek().isDone()) {
-   headIsCompleted.await();
-   }
-
-   notFull.signalAll();
-
-   LOG.debug("Polled head element from ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size() - 1, capacity);
-
-   return queue.poll();
-   } finally {
-   lock.unlock();
+   public 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954701
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -143,88 +92,31 @@ public int size() {
}
 
@Override
-   public  void put(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.size() >= capacity) {
-   notFull.await();
-   }
+   public Optional> tryPut(StreamElement streamElement) {
+   if (queue.size() < capacity) {
+   StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
-   addEntry(streamElementQueueEntry);
-   } finally {
-   lock.unlock();
-   }
-   }
+   queue.add(queueEntry);
 
-   @Override
-   public  boolean tryPut(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (queue.size() < capacity) {
-   addEntry(streamElementQueueEntry);
-
-   LOG.debug("Put element into ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size(), capacity);
+   LOG.debug("Put element into ordered stream element 
queue. New filling degree " +
+   "({}/{}).", queue.size(), capacity);
 
-   return true;
-   } else {
-   LOG.debug("Failed to put element into ordered 
stream element queue because it " +
-   "was full ({}/{}).", queue.size(), 
capacity);
+   return Optional.of(queueEntry);
+   } else {
+   LOG.debug("Failed to put element into ordered stream 
element queue because it " +
+   "was full ({}/{}).", queue.size(), capacity);
 
-   return false;
-   }
-   } finally {
-   lock.unlock();
+   return Optional.empty();
}
}
 
-   /**
-* Add the given {@link StreamElementQueueEntry} to the queue. 
Additionally, this method
-* registers a onComplete callback which is triggered once the given 
queue entry is completed.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
-
-   queue.addLast(streamElementQueueEntry);
-
-   streamElementQueueEntry.onComplete(
-   (StreamElementQueueEntry value) -> {
-   try {
-   onCompleteHandler(value);
-   } catch (InterruptedException e) {
-   // we got interrupted. This indicates a 
shutdown of the executor
-   LOG.debug("AsyncBufferEntry could not 
be properly completed because the " +
-   "executor thread has been 
interrupted.", e);
-   } catch (Throwable t) {
-   operatorActions.failOperator(new 
Exception("Could not complete the " +
-   "stream element queue entry: " 
+ value + '.', t));
-   }
-   },
-   executor);
-   }
-
-   /**
-* Check if the completed {@link StreamElementQueueEntry} is the 
current head. If this is the
-* case, then notify the consumer thread about a new consumable entry.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread is interrupted
-*/
-   private void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (!queue.isEmpty() && queue.peek().isDone()) {
-   LOG.debug("Signal ordered stream element queue 
has completed head element.");
-   headIsCompleted.signalAll();
-   }
-   } finally {
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954599
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -19,117 +19,66 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 
 /**
- * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue emits
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue provides
  * asynchronous results in the order in which the {@link 
StreamElementQueueEntry} have been added
  * to the queue. Thus, even if the completion order can be arbitrary, the 
output order strictly
  * follows the insertion order (element cannot overtake each other).
  */
 @Internal
-public class OrderedStreamElementQueue implements StreamElementQueue {
+public final class OrderedStreamElementQueue implements 
StreamElementQueue {
 
private static final Logger LOG = 
LoggerFactory.getLogger(OrderedStreamElementQueue.class);
 
/** Capacity of this queue. */
private final int capacity;
 
-   /** Executor to run the onCompletion callback. */
-   private final Executor executor;
-
-   /** Operator actions to signal a failure to the operator. */
-   private final OperatorActions operatorActions;
-
-   /** Lock and conditions for the blocking queue. */
-   private final ReentrantLock lock;
-   private final Condition notFull;
-   private final Condition headIsCompleted;
-
/** Queue for the inserted StreamElementQueueEntries. */
-   private final ArrayDeque> queue;
-
-   public OrderedStreamElementQueue(
-   int capacity,
-   Executor executor,
-   OperatorActions operatorActions) {
+   private final Queue> queue;
 
+   public OrderedStreamElementQueue(int capacity) {
Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-   this.capacity = capacity;
-
-   this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-   this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-   this.lock = new ReentrantLock(false);
-   this.headIsCompleted = lock.newCondition();
-   this.notFull = lock.newCondition();
 
+   this.capacity = capacity;
this.queue = new ArrayDeque<>(capacity);
}
 
@Override
-   public AsyncResult peekBlockingly() throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.isEmpty() || !queue.peek().isDone()) {
-   headIsCompleted.await();
-   }
-
-   LOG.debug("Peeked head element from ordered stream 
element queue with filling degree " +
-   "({}/{}).", queue.size(), capacity);
-
-   return queue.peek();
-   } finally {
-   lock.unlock();
-   }
+   public boolean hasCompleted() {
+   return !queue.isEmpty() && queue.peek().isDone();
}
 
@Override
-   public AsyncResult poll() throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.isEmpty() || !queue.peek().isDone()) {
-   headIsCompleted.await();
-   }
-
-   notFull.signalAll();
-
-   LOG.debug("Polled head element from ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size() - 1, capacity);
-
-   return queue.poll();
-   } finally {
-   lock.unlock();
+   public 

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954679
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -143,88 +92,31 @@ public int size() {
}
 
@Override
-   public  void put(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.size() >= capacity) {
-   notFull.await();
-   }
+   public Optional> tryPut(StreamElement streamElement) {
+   if (queue.size() < capacity) {
+   StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
-   addEntry(streamElementQueueEntry);
-   } finally {
-   lock.unlock();
-   }
-   }
+   queue.add(queueEntry);
 
-   @Override
-   public  boolean tryPut(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (queue.size() < capacity) {
-   addEntry(streamElementQueueEntry);
-
-   LOG.debug("Put element into ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size(), capacity);
+   LOG.debug("Put element into ordered stream element 
queue. New filling degree " +
+   "({}/{}).", queue.size(), capacity);
 
-   return true;
-   } else {
-   LOG.debug("Failed to put element into ordered 
stream element queue because it " +
-   "was full ({}/{}).", queue.size(), 
capacity);
+   return Optional.of(queueEntry);
+   } else {
+   LOG.debug("Failed to put element into ordered stream 
element queue because it " +
+   "was full ({}/{}).", queue.size(), capacity);
 
-   return false;
-   }
-   } finally {
-   lock.unlock();
+   return Optional.empty();
}
}
 
-   /**
-* Add the given {@link StreamElementQueueEntry} to the queue. 
Additionally, this method
-* registers a onComplete callback which is triggered once the given 
queue entry is completed.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
-
-   queue.addLast(streamElementQueueEntry);
-
-   streamElementQueueEntry.onComplete(
-   (StreamElementQueueEntry value) -> {
-   try {
-   onCompleteHandler(value);
-   } catch (InterruptedException e) {
-   // we got interrupted. This indicates a 
shutdown of the executor
-   LOG.debug("AsyncBufferEntry could not 
be properly completed because the " +
-   "executor thread has been 
interrupted.", e);
-   } catch (Throwable t) {
-   operatorActions.failOperator(new 
Exception("Could not complete the " +
-   "stream element queue entry: " 
+ value + '.', t));
-   }
-   },
-   executor);
-   }
-
-   /**
-* Check if the completed {@link StreamElementQueueEntry} is the 
current head. If this is the
-* case, then notify the consumer thread about a new consumable entry.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread is interrupted
-*/
-   private void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (!queue.isEmpty() && queue.peek().isDone()) {
-   LOG.debug("Signal ordered stream element queue 
has completed head element.");
-   headIsCompleted.signalAll();
-   }
-   } finally {
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954701
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -143,88 +92,31 @@ public int size() {
}
 
@Override
-   public  void put(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.size() >= capacity) {
-   notFull.await();
-   }
+   public Optional> tryPut(StreamElement streamElement) {
+   if (queue.size() < capacity) {
+   StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
-   addEntry(streamElementQueueEntry);
-   } finally {
-   lock.unlock();
-   }
-   }
+   queue.add(queueEntry);
 
-   @Override
-   public  boolean tryPut(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (queue.size() < capacity) {
-   addEntry(streamElementQueueEntry);
-
-   LOG.debug("Put element into ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size(), capacity);
+   LOG.debug("Put element into ordered stream element 
queue. New filling degree " +
+   "({}/{}).", queue.size(), capacity);
 
-   return true;
-   } else {
-   LOG.debug("Failed to put element into ordered 
stream element queue because it " +
-   "was full ({}/{}).", queue.size(), 
capacity);
+   return Optional.of(queueEntry);
+   } else {
+   LOG.debug("Failed to put element into ordered stream 
element queue because it " +
+   "was full ({}/{}).", queue.size(), capacity);
 
-   return false;
-   }
-   } finally {
-   lock.unlock();
+   return Optional.empty();
}
}
 
-   /**
-* Add the given {@link StreamElementQueueEntry} to the queue. 
Additionally, this method
-* registers a onComplete callback which is triggered once the given 
queue entry is completed.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
-
-   queue.addLast(streamElementQueueEntry);
-
-   streamElementQueueEntry.onComplete(
-   (StreamElementQueueEntry value) -> {
-   try {
-   onCompleteHandler(value);
-   } catch (InterruptedException e) {
-   // we got interrupted. This indicates a 
shutdown of the executor
-   LOG.debug("AsyncBufferEntry could not 
be properly completed because the " +
-   "executor thread has been 
interrupted.", e);
-   } catch (Throwable t) {
-   operatorActions.failOperator(new 
Exception("Could not complete the " +
-   "stream element queue entry: " 
+ value + '.', t));
-   }
-   },
-   executor);
-   }
-
-   /**
-* Check if the completed {@link StreamElementQueueEntry} is the 
current head. If this is the
-* case, then notify the consumer thread about a new consumable entry.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread is interrupted
-*/
-   private void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (!queue.isEmpty() && queue.peek().isDone()) {
-   LOG.debug("Signal ordered stream element queue 
has completed head element.");
-   headIsCompleted.signalAll();
-   }
-   } finally {
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954911
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,56 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
 
 Review comment:
   Try -> Tries


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954679
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -143,88 +92,31 @@ public int size() {
}
 
@Override
-   public  void put(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.size() >= capacity) {
-   notFull.await();
-   }
+   public Optional> tryPut(StreamElement streamElement) {
+   if (queue.size() < capacity) {
+   StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
-   addEntry(streamElementQueueEntry);
-   } finally {
-   lock.unlock();
-   }
-   }
+   queue.add(queueEntry);
 
-   @Override
-   public  boolean tryPut(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (queue.size() < capacity) {
-   addEntry(streamElementQueueEntry);
-
-   LOG.debug("Put element into ordered stream 
element queue. New filling degree " +
-   "({}/{}).", queue.size(), capacity);
+   LOG.debug("Put element into ordered stream element 
queue. New filling degree " +
+   "({}/{}).", queue.size(), capacity);
 
-   return true;
-   } else {
-   LOG.debug("Failed to put element into ordered 
stream element queue because it " +
-   "was full ({}/{}).", queue.size(), 
capacity);
+   return Optional.of(queueEntry);
+   } else {
+   LOG.debug("Failed to put element into ordered stream 
element queue because it " +
+   "was full ({}/{}).", queue.size(), capacity);
 
-   return false;
-   }
-   } finally {
-   lock.unlock();
+   return Optional.empty();
}
}
 
-   /**
-* Add the given {@link StreamElementQueueEntry} to the queue. 
Additionally, this method
-* registers a onComplete callback which is triggered once the given 
queue entry is completed.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the stream element queue entry's result
-*/
-   private  void addEntry(StreamElementQueueEntry 
streamElementQueueEntry) {
-   assert(lock.isHeldByCurrentThread());
-
-   queue.addLast(streamElementQueueEntry);
-
-   streamElementQueueEntry.onComplete(
-   (StreamElementQueueEntry value) -> {
-   try {
-   onCompleteHandler(value);
-   } catch (InterruptedException e) {
-   // we got interrupted. This indicates a 
shutdown of the executor
-   LOG.debug("AsyncBufferEntry could not 
be properly completed because the " +
-   "executor thread has been 
interrupted.", e);
-   } catch (Throwable t) {
-   operatorActions.failOperator(new 
Exception("Could not complete the " +
-   "stream element queue entry: " 
+ value + '.', t));
-   }
-   },
-   executor);
-   }
-
-   /**
-* Check if the completed {@link StreamElementQueueEntry} is the 
current head. If this is the
-* case, then notify the consumer thread about a new consumable entry.
-*
-* @param streamElementQueueEntry which has been completed
-* @throws InterruptedException if the current thread is interrupted
-*/
-   private void onCompleteHandler(StreamElementQueueEntry 
streamElementQueueEntry) throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   if (!queue.isEmpty() && queue.peek().isDone()) {
-   LOG.debug("Signal ordered stream element queue 
has completed head element.");
-   headIsCompleted.signalAll();
-   }
-   } finally {
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-25 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954640
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##
 @@ -143,88 +92,31 @@ public int size() {
}
 
@Override
-   public  void put(StreamElementQueueEntry streamElementQueueEntry) 
throws InterruptedException {
-   lock.lockInterruptibly();
-
-   try {
-   while (queue.size() >= capacity) {
-   notFull.await();
-   }
+   public Optional> tryPut(StreamElement streamElement) {
+   if (queue.size() < capacity) {
+   StreamElementQueueEntry queueEntry = 
createEntry(streamElement);
 
 Review comment:
   `StreamElementQueueEntry queueEntry` to avoid unchecked.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327552339
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
 ##
 @@ -19,68 +19,54 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /**
- * Interface for blocking stream element queues for the {@link 
AsyncWaitOperator}.
+ * Interface for a non-blocking stream element queues for the {@link 
AsyncWaitOperator}.
  */
 @Internal
-public interface StreamElementQueue {
+public interface StreamElementQueue {
 
/**
-* Put the given element in the queue if capacity is left. If not, then 
block until this is
-* the case.
+* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity left and fails if
+* the queue is full.
 *
-* @param streamElementQueueEntry to be put into the queue
-* @param  Type of the entries future value
-* @throws InterruptedException if the calling thread has been 
interrupted while waiting to
-*  insert the given element
-*/
-void put(StreamElementQueueEntry streamElementQueueEntry) throws 
InterruptedException;
-
-   /**
-* Try to put the given element in the queue. This operation succeeds 
if the queue has capacity
-* left and fails if the queue is full.
-*
-* @param streamElementQueueEntry to be inserted
-* @param  Type of the entries future value
+* @param streamElement to be inserted
 * @return True if the entry could be inserted; otherwise false
 
 Review comment:
   We need to adjust the return javadoc because it is not boolean value now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551382
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) {
}
 
@Override
-   public void processElement(StreamRecord element) throws Exception {
-   final StreamRecordQueueEntry streamRecordBufferEntry = new 
StreamRecordQueueEntry<>(element);
+   public void processElement(final StreamRecord element) throws 
Exception {
+   // add element first to the queue
+   final ResultFuture entry = addToWorkQueue(element);
+
+   final ResultHandler resultHandler = new ResultHandler(element, 
entry);
 
+   // register a timeout for the entry if timeout is configured
if (timeout > 0L) {
-   // register a timeout for this 
AsyncStreamRecordBufferEntry
-   long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
-
-   final ScheduledFuture timerFuture = 
getProcessingTimeService().registerTimer(
-   timeoutTimestamp,
-   new ProcessingTimeCallback() {
-   @Override
-   public void onProcessingTime(long 
timestamp) throws Exception {
-   
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
-   }
-   });
-
-   // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
-   // the register trigger task
-   streamRecordBufferEntry.onComplete(
-   (StreamElementQueueEntry> 
value) -> {
-   timerFuture.cancel(true);
-   },
-   executor);
-   }
+   final long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-   addAsyncBufferEntry(streamRecordBufferEntry);
+   final ScheduledFuture timeoutTimer = 
getProcessingTimeService().registerTimer(
+   timeoutTimestamp,
+   timestamp -> 
userFunction.timeout(element.getValue(), resultHandler));
 
-   userFunction.asyncInvoke(element.getValue(), 
streamRecordBufferEntry);
+   resultHandler.setTimeoutTimer(timeoutTimer);
 
 Review comment:
   I have potential worries of setting timer delay here. I mean if above 
`registerTimer` happens before this setting, and during 
`ResultHandler#complete` if `mailboxExecutor.execute` also happens immediately, 
then the un-set timers would not be canceled any more.  
   
   I know it would not happen atm because `mailboxExecutor.execute` just 
inserts into queue and would not execute right now. Maybe my worry could be 
ignored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551424
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -209,41 +176,31 @@ else if (element.isLatencyMarker()) {
}
 
@Override
-   public void processElement(StreamRecord element) throws Exception {
-   final StreamRecordQueueEntry streamRecordBufferEntry = new 
StreamRecordQueueEntry<>(element);
+   public void processElement(final StreamRecord element) throws 
Exception {
+   // add element first to the queue
+   final ResultFuture entry = addToWorkQueue(element);
+
+   final ResultHandler resultHandler = new ResultHandler(element, 
entry);
 
+   // register a timeout for the entry if timeout is configured
if (timeout > 0L) {
-   // register a timeout for this 
AsyncStreamRecordBufferEntry
-   long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
-
-   final ScheduledFuture timerFuture = 
getProcessingTimeService().registerTimer(
-   timeoutTimestamp,
-   new ProcessingTimeCallback() {
-   @Override
-   public void onProcessingTime(long 
timestamp) throws Exception {
-   
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
-   }
-   });
-
-   // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
-   // the register trigger task
-   streamRecordBufferEntry.onComplete(
-   (StreamElementQueueEntry> 
value) -> {
-   timerFuture.cancel(true);
-   },
-   executor);
-   }
+   final long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-   addAsyncBufferEntry(streamRecordBufferEntry);
+   final ScheduledFuture timeoutTimer = 
getProcessingTimeService().registerTimer(
+   timeoutTimestamp,
+   timestamp -> 
userFunction.timeout(element.getValue(), resultHandler));
 
-   userFunction.asyncInvoke(element.getValue(), 
streamRecordBufferEntry);
+   resultHandler.setTimeoutTimer(timeoutTimer);
+   }
+
+   userFunction.asyncInvoke(element.getValue(), resultHandler);
}
 
@Override
public void processWatermark(Watermark mark) throws Exception {
-   WatermarkQueueEntry watermarkBufferEntry = new 
WatermarkQueueEntry(mark);
+   addToWorkQueue(mark);
 
-   addAsyncBufferEntry(watermarkBufferEntry);
+   outputCompletedElements();
 
 Review comment:
   Sorry for touching this issue again. I regard it as a separate improvement 
out of this pr scope, but I could also accept it if you wish. I am not quite 
sure whether it is worth verifying the effect of this improvement. Maybe not 
very necessary. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551484
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -276,10 +232,10 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
@Override
public void initializeState(StateInitializationContext context) throws 
Exception {
super.initializeState(context);
+
 
 Review comment:
   nit: unnecessary change. If we want to reformat the codes it is better for a 
separate hotfix commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551509
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -276,10 +232,10 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
@Override
public void initializeState(StateInitializationContext context) throws 
Exception {
super.initializeState(context);
+
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, 
inStreamElementSerializer));
-
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551549
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +249,113 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
 
 Review comment:
   How to guarantee that `yield()` could consume the elements from `queue` 
finally?
   
   In previous way the `EmitterThread` could consume the queue, but now I did 
not see any runnable to be submitted into mailbox for consuming the queue. I 
know `outputCompletedElements()` could empty the queue, but it can not happen 
before calling `userFunction.asyncInvoke(element.getValue(), resultHandler)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551609
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -293,140 +249,113 @@ public void close() throws Exception {
waitInFlightInputsFinished();
}
finally {
-   Exception exception = null;
-
-   try {
-   super.close();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
-   }
-
-   try {
-   // terminate the emitter, the emitter thread 
and the executor
-   stopResources(true);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
-
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
-
-   if (exception != null) {
-   LOG.warn("Errors occurred while closing the 
AsyncWaitOperator.", exception);
-   }
+   super.close();
}
}
 
-   @Override
-   public void dispose() throws Exception {
-   Exception exception = null;
+   /**
+* Add the given stream element to the operator's stream element queue. 
This operation blocks until the element
+* has been added.
+*
+* Between two insertion attempts, this method yields the execution 
to the mailbox, such that events as well
+* as asynchronous results can be processed.
+*
+* @param streamElement to add to the operator's queue
+* @throws InterruptedException if the current thread has been 
interrupted while yielding to mailbox
+* @return a handle that allows to set the result of the async 
computation for the given element.
+*/
+   private ResultFuture addToWorkQueue(StreamElement streamElement) 
throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   try {
-   super.dispose();
-   } catch (InterruptedException interrupted) {
-   exception = interrupted;
+   pendingStreamElement = streamElement;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = e;
+   Optional> queueEntry;
+   while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
{
+   mailboxExecutor.yield();
}
 
-   try {
-   stopResources(false);
-   } catch (InterruptedException interrupted) {
-   exception = 
ExceptionUtils.firstOrSuppressed(interrupted, exception);
+   pendingStreamElement = null;
 
-   Thread.currentThread().interrupt();
-   } catch (Exception e) {
-   exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-   }
+   return queueEntry.get();
+   }
+
+   private void waitInFlightInputsFinished() throws InterruptedException {
+   assert(Thread.holdsLock(checkpointingLock));
 
-   if (exception != null) {
-   throw exception;
+   while (!queue.isEmpty()) {
+   mailboxExecutor.yield();
}
}
 
/**
-* Close the operator's resources. They include the emitter thread and 
the executor to run
-* the queue's complete operation.
+* Batch output of all completed elements. Watermarks are always 
completed if it's their turn to be processed.
 *
-* @param waitForShutdown is true if the method should wait for the 
resources to be freed;
-*   otherwise false.
-* @throws InterruptedException if current thread has been interrupted
+* This method will be called from {@link 
#processWatermark(Watermark)} and from a mail processing the result
+* of an async function call.
 */
-   private void stopResources(boolean waitForShutdown) throws 
InterruptedException {
-   emitter.stop();
-   

[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator

2019-09-24 Thread GitBox
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327551336
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##
 @@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) {
}
 
@Override
-   public void processElement(StreamRecord element) throws Exception {
-   final StreamRecordQueueEntry streamRecordBufferEntry = new 
StreamRecordQueueEntry<>(element);
+   public void processElement(final StreamRecord element) throws 
Exception {
 
 Review comment:
   Nit: this change might not be suggested mixing with the core changes. Unless 
this line was also touched by the core change, then we could add the `final` in 
passing. From the consistent view,  in the following 
`processWatermark(Watermark mark)` we also have no final argument.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services