[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328576174 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,120 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } + + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - if (exception != null) { - throw exception; + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328572264 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,120 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } + + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - if (exception != null) { - throw exception; + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328571467 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +252,120 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } + + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - if (exception != null) { - throw exception; + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328544533 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java ## @@ -18,174 +18,108 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; + +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; /** * {@link UnorderedStreamElementQueue} specific tests. */ public class UnorderedStreamElementQueueTest extends TestLogger { - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only elements before the oldest watermark are returned if they are completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - - final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue(8, executor, operatorActions); - - StreamRecordQueueEntry record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - StreamRecordQueueEntry record4 = new StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L)); - WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new Watermark(5L)); - StreamRecordQueueEntry record5 = new StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L)); - StreamRecordQueueEntry record6 = new StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L)); - - List> entries = Arrays.asList(record1, record2, watermark1, record3, - record4, watermark2, record5, record6); - - // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6 - for (StreamElementQueueEntry entry : entries) { - queue.put(entry); - } - - Assert.assertTrue(8 == queue.size()); - - CompletableFuture firstPoll = CompletableFuture.supplyAsync( - () -> { - try { - return queue.poll(); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - }, - executor); - - // this should not fulfill the poll, because R3 is behind W1 - record3.complete(Collections.emptyList()); + public void testCompletionOrder() { + final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue<>(8); - Thread.sleep(10L); + ResultFuture record1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture record2 =
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328422187 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. Review comment: Yes. Elements within a stage could overtake each other. One more thing for further confirmation: how to guarantee that two stages can not overtake each other? e.g. assuming we have four stages : {a, b, c} {watermark1} {d, e, f} {watermark2}, and the first stage is the {a, b, c}. If the user function completes the element `e` in third stage firstly, that means the collection of `e` would be emitted to output before the first stage? Or I missed something else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328541881 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java ## @@ -18,174 +18,108 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; + +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; /** * {@link UnorderedStreamElementQueue} specific tests. */ public class UnorderedStreamElementQueueTest extends TestLogger { - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only elements before the oldest watermark are returned if they are completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - - final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue(8, executor, operatorActions); - - StreamRecordQueueEntry record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - StreamRecordQueueEntry record4 = new StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L)); - WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new Watermark(5L)); - StreamRecordQueueEntry record5 = new StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L)); - StreamRecordQueueEntry record6 = new StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L)); - - List> entries = Arrays.asList(record1, record2, watermark1, record3, - record4, watermark2, record5, record6); - - // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6 - for (StreamElementQueueEntry entry : entries) { - queue.put(entry); - } - - Assert.assertTrue(8 == queue.size()); - - CompletableFuture firstPoll = CompletableFuture.supplyAsync( - () -> { - try { - return queue.poll(); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - }, - executor); - - // this should not fulfill the poll, because R3 is behind W1 - record3.complete(Collections.emptyList()); + public void testCompletionOrder() { + final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue<>(8); - Thread.sleep(10L); + ResultFuture record1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture record2 =
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328538591 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java ## @@ -18,253 +18,117 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType; -import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putUnsucessfully; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests for the basic functionality of {@link StreamElementQueue}. The basic operations consist * of putting and polling elements from the queue. */ @RunWith(Parameterized.class) public class StreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - - enum StreamElementQueueType { - OrderedStreamElementQueueType, - UnorderedStreamElementQueueType - } - @Parameterized.Parameters - public static Collection streamElementQueueTypes() { - return Arrays.asList(OrderedStreamElementQueueType, UnorderedStreamElementQueueType); + public static Collection streamElementQueueTypes() { Review comment: streamElementQueueTypes -> outputModes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328538575 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java ## @@ -18,253 +18,117 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType; -import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putUnsucessfully; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests for the basic functionality of {@link StreamElementQueue}. The basic operations consist * of putting and polling elements from the queue. */ @RunWith(Parameterized.class) public class StreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - - enum StreamElementQueueType { - OrderedStreamElementQueueType, - UnorderedStreamElementQueueType - } - @Parameterized.Parameters - public static Collection streamElementQueueTypes() { - return Arrays.asList(OrderedStreamElementQueueType, UnorderedStreamElementQueueType); + public static Collection streamElementQueueTypes() { + return Arrays.asList(AsyncDataStream.OutputMode.ORDERED, AsyncDataStream.OutputMode.UNORDERED); } - private final StreamElementQueueType streamElementQueueType; + private final AsyncDataStream.OutputMode outputMode; - public StreamElementQueueTest(StreamElementQueueType streamElementQueueType) { - this.streamElementQueueType = Preconditions.checkNotNull(streamElementQueueType); + public StreamElementQueueTest(AsyncDataStream.OutputMode outputMode) { + this.outputMode = Preconditions.checkNotNull(outputMode); } - public StreamElementQueue createStreamElementQueue(int capacity, OperatorActions operatorActions) { - switch (streamElementQueueType) { - case OrderedStreamElementQueueType: - return new OrderedStreamElementQueue(capacity, executor, operatorActions); - case UnorderedStreamElementQueueType: - return new UnorderedStreamElementQueue(capacity, executor, operatorActions); + public StreamElementQueue createStreamElementQueue(int capacity) { Review comment: public -> private This is an automated message from the Apache
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328526479 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + /** * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only the head element is pulled from the ordered queue if it has been * completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions); - - StreamRecordQueueEntry entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - - List> expected = Arrays.asList(entry1, entry2, entry3, entry4); - - for (StreamElementQueueEntry entry : expected) { - queue.put(entry); - } + public void testCompletionOrder() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(4); - CompletableFuture> pollOperation = CompletableFuture.supplyAsync( - () -> { - List result = new ArrayList<>(4); - while (!queue.isEmpty()) { - try { - result.add(queue.poll()); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - } - - return result; - }, - executor); - - Thread.sleep(10L); - - Assert.assertFalse(pollOperation.isDone()); - - entry2.complete(Collections.emptyList()); - - entry4.complete(Collections.emptyList()); - - Thread.sleep(10L); + ResultFuture entry1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture entry2 = putSucessfully(queue, new StreamRecord<>(2, 1L)); + putSucessfully(queue, new Watermark(2L)); + ResultFuture entry4 = putSucessfully(queue, new StreamRecord<>(3, 3L)); +
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328525293 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + /** * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only the head element is pulled from the ordered queue if it has been * completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions); - - StreamRecordQueueEntry entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - - List> expected = Arrays.asList(entry1, entry2, entry3, entry4); - - for (StreamElementQueueEntry entry : expected) { - queue.put(entry); - } + public void testCompletionOrder() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(4); - CompletableFuture> pollOperation = CompletableFuture.supplyAsync( - () -> { - List result = new ArrayList<>(4); - while (!queue.isEmpty()) { - try { - result.add(queue.poll()); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - } - - return result; - }, - executor); - - Thread.sleep(10L); - - Assert.assertFalse(pollOperation.isDone()); - - entry2.complete(Collections.emptyList()); - - entry4.complete(Collections.emptyList()); - - Thread.sleep(10L); + ResultFuture entry1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture entry2 = putSucessfully(queue, new StreamRecord<>(2, 1L)); + putSucessfully(queue, new Watermark(2L)); + ResultFuture entry4 = putSucessfully(queue, new StreamRecord<>(3, 3L)); +
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328525357 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + /** * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only the head element is pulled from the ordered queue if it has been * completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions); - - StreamRecordQueueEntry entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - - List> expected = Arrays.asList(entry1, entry2, entry3, entry4); - - for (StreamElementQueueEntry entry : expected) { - queue.put(entry); - } + public void testCompletionOrder() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(4); - CompletableFuture> pollOperation = CompletableFuture.supplyAsync( - () -> { - List result = new ArrayList<>(4); - while (!queue.isEmpty()) { - try { - result.add(queue.poll()); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - } - - return result; - }, - executor); - - Thread.sleep(10L); - - Assert.assertFalse(pollOperation.isDone()); - - entry2.complete(Collections.emptyList()); - - entry4.complete(Collections.emptyList()); - - Thread.sleep(10L); + ResultFuture entry1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture entry2 = putSucessfully(queue, new StreamRecord<>(2, 1L)); + putSucessfully(queue, new Watermark(2L)); + ResultFuture entry4 = putSucessfully(queue, new StreamRecord<>(3, 3L)); +
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328525320 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + /** * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { - - private static final long timeout = 1L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only the head element is pulled from the ordered queue if it has been * completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions); - - StreamRecordQueueEntry entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - - List> expected = Arrays.asList(entry1, entry2, entry3, entry4); - - for (StreamElementQueueEntry entry : expected) { - queue.put(entry); - } + public void testCompletionOrder() { + final OrderedStreamElementQueue queue = new OrderedStreamElementQueue<>(4); - CompletableFuture> pollOperation = CompletableFuture.supplyAsync( - () -> { - List result = new ArrayList<>(4); - while (!queue.isEmpty()) { - try { - result.add(queue.poll()); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - } - - return result; - }, - executor); - - Thread.sleep(10L); - - Assert.assertFalse(pollOperation.isDone()); - - entry2.complete(Collections.emptyList()); - - entry4.complete(Collections.emptyList()); - - Thread.sleep(10L); + ResultFuture entry1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture entry2 = putSucessfully(queue, new StreamRecord<>(2, 1L)); + putSucessfully(queue, new Watermark(2L)); + ResultFuture entry4 = putSucessfully(queue, new StreamRecord<>(3, 3L)); +
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328521088 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Tries to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompletedElements()} before entering +* any critical section). +* +* @param output the output into which to emit */ - AsyncResult peekBlockingly() throws InterruptedException; + void emitCompletedElement(TimestampedCollector output); /** -* Poll the first completed {@link AsyncResult} from the head of this queue. This operation is -* blocking and only returns once a completed async result has been found. +* Checks if there is at least one completed element. Review comment: This description is not very accurate for the ordered case. For order case, it must be the head element completed in queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328471027 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertTrue(resultFuture.isPresent()); + return resultFuture.get(); + } + + static void putUnsucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertFalse(resultFuture.isPresent()); + } + + /** +* Pops all completed elements from the head of this queue. +* +* @return Completed elements or empty list of none exists. +*/ + static List popCompleted(StreamElementQueue queue) { + final List completed = new ArrayList<>(); + TimestampedCollector collector = new TimestampedCollector<>(new CollectorOutput<>(completed)); + while (queue.hasCompletedElements()) { + queue.emitCompletedElement(collector); + } + collector.close(); Review comment: ATM the `close` actually did nothing, but the proposed semantic from `Collector#close` would flush buffered data. If so I wonder it might have potential risk to call `close` before return if the `close` implementation is changed to clear the list future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328471027 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertTrue(resultFuture.isPresent()); + return resultFuture.get(); + } + + static void putUnsucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertFalse(resultFuture.isPresent()); + } + + /** +* Pops all completed elements from the head of this queue. +* +* @return Completed elements or empty list of none exists. +*/ + static List popCompleted(StreamElementQueue queue) { + final List completed = new ArrayList<>(); + TimestampedCollector collector = new TimestampedCollector<>(new CollectorOutput<>(completed)); + while (queue.hasCompletedElements()) { + queue.emitCompletedElement(collector); + } + collector.close(); Review comment: ATM the `close` actually did nothing, but the proposed semantic from `Collector#close` would flush buffered data. If so I wonder it might have potential risk to call `close` before return it if the `close` implementation is changed to clear the list future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328471027 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertTrue(resultFuture.isPresent()); + return resultFuture.get(); + } + + static void putUnsucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertFalse(resultFuture.isPresent()); + } + + /** +* Pops all completed elements from the head of this queue. +* +* @return Completed elements or empty list of none exists. +*/ + static List popCompleted(StreamElementQueue queue) { + final List completed = new ArrayList<>(); + TimestampedCollector collector = new TimestampedCollector<>(new CollectorOutput<>(completed)); + while (queue.hasCompletedElements()) { + queue.emitCompletedElement(collector); + } + collector.close(); Review comment: ATM the `close` actually did nothing, but the proposed semantic from `Collector#close` would flush buffered data. If so I wonder it might have potential risk to clear the `completed` list before return if the implementation is changed future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328466285 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertTrue(resultFuture.isPresent()); + return resultFuture.get(); + } + + static void putUnsucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertFalse(resultFuture.isPresent()); + } + + /** +* Pops all completed elements from the head of this queue. +* +* @return Completed elements or empty list of none exists. Review comment: `empty list if none exists` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328464439 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { + Optional> resultFuture = queue.tryPut(streamElement); + assertTrue(resultFuture.isPresent()); + return resultFuture.get(); + } + + static void putUnsucessfully(StreamElementQueue queue, StreamElement streamElement) { Review comment: typo: sucess This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328464339 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/QueueUtil.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async.queue; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.util.CollectorOutput; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Utility for putting elements inside a {@link StreamElementQueue}. + */ +class QueueUtil { + static ResultFuture putSucessfully(StreamElementQueue queue, StreamElement streamElement) { Review comment: typo: Sucess -> Success This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328463014 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + Review comment: remove extra empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328462756 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -100,6 +102,9 @@ public class AsyncWaitOperatorTest extends TestLogger { private static final long TIMEOUT = 1000L; + @Rule + public Timeout timeoutRule = new Timeout(10, TimeUnit.SECONDS); Review comment: Why we need this change? Some previous tests were configured separately as 2 seconds, but now we unify it for 10 seconds for all the cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328459063 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java ## @@ -71,4 +73,19 @@ public void eraseTimestamp() { public void close() { output.close(); } + + @Override + public void emitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { Review comment: It is involved in three new methods by implementing `Output` interface instead. But actually we only need the `emitWatermark` method, and the left two methods would never be touched in current codes. If so, another option is that we can only define `emitWatermark` method here and still implement `Collector` interface. Or we could refactor the previous usages of `TimestampedCollector` to reuse the other two methods for avoiding touching the internal wrapped `output` directly. It might be out of this PR scope or a separate hotfix commit if you want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328459063 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java ## @@ -71,4 +73,19 @@ public void eraseTimestamp() { public void close() { output.close(); } + + @Override + public void emitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { Review comment: It is involved in three new methods by implementing `Output` interface instead. But actually we only need the `emitWatermark` method, and the left two methods would never be touched in codes. If so, another option is that we can only define `emitWatermark` method here and still implement `Collector` interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328422187 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. Review comment: Yes. Elements within a stage could overtake each other. One more thing for further confirmation: how to guarantee that two stages can not overtake each other? e.g. assuming we have four stages : {a, b, c} {watermark1} {d, e, f} {watermark2}, and the first stage is the {a, b, c}. If the user function completes the element `e` in third stage firstly, that means the collection of `e` would be emitted to output before the first stage? Or I missed something else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328416725 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java ## @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws MailboxStateException, I try { maybeLetter.get().run(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException("Cannot process mail " + maybeLetter.get(), e); Review comment: Actually I concerned two issues: - `IllegalStateException` seems not very accurate here. - If we want to provide more message for debugging, maybe we could use `ExceptionUtils.rethrow(e, "Cannot process mail " + maybeLetter.get());`. But I am not sure whether `maybeLetter.get()` could give any helpful info here. If the debug info is useless, it seems simple to remove `try...catch` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328416725 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java ## @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws MailboxStateException, I try { maybeLetter.get().run(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException("Cannot process mail " + maybeLetter.get(), e); Review comment: Actually I concerned two issues here: - `IllegalStateException` seems not very accurate here. - If we want to provide more message for debugging, maybe we could use `ExceptionUtils.rethrow(e, "Cannot process mail " + maybeLetter.get());`. But I am not sure whether `maybeLetter.get()` could give any helpful info here. If the debug info is useless, it seems simple to remove `try...catch` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328414259 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java ## @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws MailboxStateException, I try { maybeLetter.get().run(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException("Cannot process mail " + maybeLetter.get(), e); Review comment: From the description of `AsyncExceptionHandler`, it is proposed to used by other threads except the main task thread. AFAIK it is only used by `SplitReader` in `ContinuousFileReaderOperator` now. For this case if the `processMail` is executed in the main task thread, maybe it is out of scope of `AsyncExceptionHandler`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328069833 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java ## @@ -200,7 +200,7 @@ private boolean processMail(TaskMailbox mailbox) throws MailboxStateException, I try { maybeLetter.get().run(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException("Cannot process mail " + maybeLetter.get(), e); Review comment: If we want to interrupt the execution for any exceptions during looping the letters, I am wondering whether this wrapped exception provides any helpful debug information. If not, we could remove `try..catch` clauses here for simple. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328066958 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; - if (bufferEntry.isDone()) { - completedQueue.offer(bufferEntry); - it.remove(); - } - } - } + Stage(int initialCapacity) { + completedQueue = new ArrayDeque<>(initialCapacity); + pendingElements = new HashSet<>(initialCapacity); + } - LOG.debug("Signal unordered stream element queue has completed entries."); - hasCompletedEntries.signalAll(); + /** +* Signal that an entry finished computation. +*/ + void completed(StreamElementQueueEntry elementQueueEntry) { + // adding only to completed queue if not completed before + // there may be a real result coming after the actual result, which is updated in the queue entry but + // the entry is not readded to the complete queue + if (pendingElements.remove(elementQueueEntry)) { + completedQueue.add(elementQueueEntry); Review comment: In the past, after handling the current notified completed entry, it would fetch the next set and iterate to check whether there are completed entries in the next set. I do not know why it needs to do that before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328064864 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java ## @@ -19,31 +19,44 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.Preconditions; -import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; + +import java.util.Collection; /** * {@link StreamElementQueueEntry} implementation for the {@link Watermark}. */ @Internal -public class WatermarkQueueEntry extends StreamElementQueueEntry implements AsyncWatermarkResult { +class WatermarkQueueEntry implements StreamElementQueueEntry { + @Nonnull + private final Watermark watermark; - private final CompletableFuture future; + WatermarkQueueEntry(@Nonnull Watermark watermark) { Review comment: ditto: `@Nonnull` issue. I guess it is the common sense for `Nonnull`. Then it is only necessary to tag it if `Nullable`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328060993 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; - if (bufferEntry.isDone()) { - completedQueue.offer(bufferEntry); - it.remove(); - } - } - } + Stage(int initialCapacity) { + completedQueue = new ArrayDeque<>(initialCapacity); + pendingElements = new HashSet<>(initialCapacity); + } - LOG.debug("Signal unordered stream element queue has completed entries."); - hasCompletedEntries.signalAll(); + /** +* Signal that an entry finished computation. +*/ + void completed(StreamElementQueueEntry elementQueueEntry) { + // adding only to completed queue if not completed before + // there may be a real result coming after the actual result, which is updated in the queue entry but + // the entry is not readded to the complete queue + if (pendingElements.remove(elementQueueEntry)) { + completedQueue.add(elementQueueEntry); } - } finally { - lock.unlock(); } - } - /** -* Add the given stream element queue entry to the current last set if it is not a watermark. -* If it is a watermark, then stop adding to the current last set, insert the watermark into its -* own set and add a new last set. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); + /** +*
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328059924 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; - if (bufferEntry.isDone()) { - completedQueue.offer(bufferEntry); - it.remove(); - } - } - } + Stage(int initialCapacity) { + completedQueue = new ArrayDeque<>(initialCapacity); + pendingElements = new HashSet<>(initialCapacity); + } - LOG.debug("Signal unordered stream element queue has completed entries."); - hasCompletedEntries.signalAll(); + /** +* Signal that an entry finished computation. Review comment: nit: Signal -> Signals This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328059856 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ Review comment: Remove `Finished` for the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328059752 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; Review comment: I think it is better to rename `uncompletedElements` instead, because the semantic `pending` in below `addPendingElements` covers both completed/uncompleted elements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328056484 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. Review comment: The previous descriptions for order/unorder issues are not very clearly in my thought. Especially for `no stream record can be overtaken by a watermark and no watermark can overtake a stream record`, the sayings before/after `and` are the same. I think it is better to involve in the stage concept for better understanding. If I understood correctly, elements in one stage are kept the order, and elements in different stages are not in order which means overtaking each other. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327964157 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; Review comment: numberEntries -> numberOfPendingEntries? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327976682 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; + public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.uncompletedQueue = new ArrayDeque<>(capacity); - this.completedQueue = new ArrayDeque<>(capacity); - - this.firstSet = new HashSet<>(capacity); - this.lastSet = firstSet; + this.capacity = capacity; + // most likely scenario are 4 stages + this.stages = new ArrayDeque<>(4); this.numberEntries = 0; - -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327976571 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; + public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.uncompletedQueue = new ArrayDeque<>(capacity); - this.completedQueue = new ArrayDeque<>(capacity); - - this.firstSet = new HashSet<>(capacity); - this.lastSet = firstSet; + this.capacity = capacity; + // most likely scenario are 4 stages + this.stages = new ArrayDeque<>(4); this.numberEntries = 0; - -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327976364 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; Review comment: `Deque>` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327976364 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; Review comment: Deque> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327975397 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; + public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.uncompletedQueue = new ArrayDeque<>(capacity); - this.completedQueue = new ArrayDeque<>(capacity); - - this.firstSet = new HashSet<>(capacity); - this.lastSet = firstSet; + this.capacity = capacity; + // most likely scenario are 4 stages + this.stages = new ArrayDeque<>(4); this.numberEntries = 0; - -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327975499 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; + public UnorderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.uncompletedQueue = new ArrayDeque<>(capacity); - this.completedQueue = new ArrayDeque<>(capacity); - - this.firstSet = new HashSet<>(capacity); - this.lastSet = firstSet; + this.capacity = capacity; + // most likely scenario are 4 stages + this.stages = new ArrayDeque<>(4); this.numberEntries = 0; - -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327974424 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; - if (bufferEntry.isDone()) { - completedQueue.offer(bufferEntry); - it.remove(); - } - } - } + Stage(int initialCapacity) { + completedQueue = new ArrayDeque<>(initialCapacity); + pendingElements = new HashSet<>(initialCapacity); + } - LOG.debug("Signal unordered stream element queue has completed entries."); - hasCompletedEntries.signalAll(); + /** +* Signal that an entry finished computation. +*/ + void completed(StreamElementQueueEntry elementQueueEntry) { + // adding only to completed queue if not completed before + // there may be a real result coming after the actual result, which is updated in the queue entry but + // the entry is not readded to the complete queue + if (pendingElements.remove(elementQueueEntry)) { + completedQueue.add(elementQueueEntry); } - } finally { - lock.unlock(); } - } - /** -* Add the given stream element queue entry to the current last set if it is not a watermark. -* If it is a watermark, then stop adding to the current last set, insert the watermark into its -* own set and add a new last set. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); + /** +*
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327974182 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; - while (it.hasNext()) { - StreamElementQueueEntry bufferEntry = it.next(); + /** Finished and unfinished input elements. Used solely for checkpointing. */ + private final Set> pendingElements; - if (bufferEntry.isDone()) { - completedQueue.offer(bufferEntry); - it.remove(); - } - } - } + Stage(int initialCapacity) { + completedQueue = new ArrayDeque<>(initialCapacity); + pendingElements = new HashSet<>(initialCapacity); + } - LOG.debug("Signal unordered stream element queue has completed entries."); - hasCompletedEntries.signalAll(); + /** +* Signal that an entry finished computation. +*/ + void completed(StreamElementQueueEntry elementQueueEntry) { + // adding only to completed queue if not completed before + // there may be a real result coming after the actual result, which is updated in the queue entry but + // the entry is not readded to the complete queue Review comment: typo: readded This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327973730 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; Review comment: `Stage` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327973356 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { Review comment: package private This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327973260 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); Review comment: remove `this` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327970135 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -218,88 +172,99 @@ public int size() { } /** -* Callback for onComplete events for the given stream element queue entry. Whenever a queue -* entry is completed, it is checked whether this entry belongs to the first set. If this is the -* case, then the element is added to the completed entries queue from where it can be consumed. -* If the first set becomes empty, then the next set is polled from the uncompleted entries -* queue. Completed entries from this new set are then added to the completed entries queue. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread has been interrupted while performing the -* on complete callback. +* An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry extends StreamRecordQueueEntry { + private final Stage stage; - try { - if (firstSet.remove(streamElementQueueEntry)) { - completedQueue.offer(streamElementQueueEntry); + public StagedStreamRecordQueueEntry(StreamRecord inputRecord, Stage stage) { + super(inputRecord); + this.stage = stage; + } - while (firstSet.isEmpty() && firstSet != lastSet) { - firstSet = uncompletedQueue.poll(); + @Override + public void complete(Collection result) { + super.complete(result); + this.stage.completed(this); + } + } - Iterator> it = firstSet.iterator(); + /** +* A stage is a collection of queue entries that can be completed in arbitrary order. +*/ + static class Stage { + /** Undrained finished elements. */ + private final Queue> completedQueue; Review comment: completedQueue -> completedElements This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327964157 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ## @@ -19,192 +19,146 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** - * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue - * emits asynchronous results as soon as they are completed. Additionally it maintains the - * watermark-stream record order. This means that no stream record can be overtaken by a watermark - * and no watermark can overtake a stream record. However, stream records falling in the same - * segment between two watermarks can overtake each other (their emission order is not guaranteed). + * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue provides + * asynchronous results as soon as they are completed. Additionally it maintains the watermark-stream record order. + * This means that no stream record can be overtaken by a watermark and no watermark can overtake a stream record. + * However, stream records falling in the same segment between two watermarks can overtake each other (their emission + * order is not guaranteed). */ @Internal -public class UnorderedStreamElementQueue implements StreamElementQueue { +public final class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onComplete callbacks. */ - private final Executor executor; + /** Queue of queue entries segmented by watermarks. */ + private final Deque stages; - /** OperatorActions to signal the owning operator a failure. */ - private final OperatorActions operatorActions; - - /** Queue of uncompleted stream element queue entries segmented by watermarks. */ - private final ArrayDeque>> uncompletedQueue; - - /** Queue of completed stream element queue entries. */ - private final ArrayDeque> completedQueue; - - /** First (chronologically oldest) uncompleted set of stream element queue entries. */ - private Set> firstSet; - - // Last (chronologically youngest) uncompleted set of stream element queue entries. New - // stream element queue entries are inserted into this set. - private Set> lastSet; - private volatile int numberEntries; - - /** Locks and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition hasCompletedEntries; - - public UnorderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private int numberEntries; Review comment: numberEntries -> numberOfEntries? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327962020 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ## @@ -21,65 +21,55 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The * async function completes this class with a collection of results. * - * @param Type of the asynchronous collection result + * @param Type of the asynchronous collection result. */ @Internal -public class StreamRecordQueueEntry extends StreamElementQueueEntry> - implements AsyncCollectionResult, ResultFuture { - - /** Timestamp information. */ - private final boolean hasTimestamp; - private final long timestamp; - - /** Future containing the collection result. */ - private final CompletableFuture> resultFuture; +class StreamRecordQueueEntry implements StreamElementQueueEntry { + private Collection completed; - public StreamRecordQueueEntry(StreamRecord streamRecord) { - super(streamRecord); + @Nonnull + private final StreamRecord inputRecord; - hasTimestamp = streamRecord.hasTimestamp(); - timestamp = streamRecord.getTimestamp(); - - resultFuture = new CompletableFuture<>(); + StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) { + this.inputRecord = Preconditions.checkNotNull(inputRecord); } @Override - public boolean hasTimestamp() { - return hasTimestamp; + public boolean isDone() { + return completed != null; } + @Nonnull @Override - public long getTimestamp() { - return timestamp; + public StreamRecord getInputElement() { + return inputRecord; } @Override - public Collection get() throws Exception { - return resultFuture.get(); - } + public void emitResult(TimestampedCollector output) { + Preconditions.checkState(completed != null, "Not done yet"); - @Override - protected CompletableFuture> getFuture() { - return resultFuture; + output.setTimestamp(this.inputRecord); + for (OUT r : completed) { + output.collect(r); + } } @Override public void complete(Collection result) { - resultFuture.complete(result); - } - - @Override - public void completeExceptionally(Throwable error) { - resultFuture.completeExceptionally(error); + this.completed = result; Review comment: ditto: `this`. Do we need to `checkNotNull(result)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327962005 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ## @@ -21,65 +21,55 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The * async function completes this class with a collection of results. * - * @param Type of the asynchronous collection result + * @param Type of the asynchronous collection result. */ @Internal -public class StreamRecordQueueEntry extends StreamElementQueueEntry> - implements AsyncCollectionResult, ResultFuture { - - /** Timestamp information. */ - private final boolean hasTimestamp; - private final long timestamp; - - /** Future containing the collection result. */ - private final CompletableFuture> resultFuture; +class StreamRecordQueueEntry implements StreamElementQueueEntry { + private Collection completed; - public StreamRecordQueueEntry(StreamRecord streamRecord) { - super(streamRecord); + @Nonnull + private final StreamRecord inputRecord; - hasTimestamp = streamRecord.hasTimestamp(); - timestamp = streamRecord.getTimestamp(); - - resultFuture = new CompletableFuture<>(); + StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) { + this.inputRecord = Preconditions.checkNotNull(inputRecord); } @Override - public boolean hasTimestamp() { - return hasTimestamp; + public boolean isDone() { + return completed != null; } + @Nonnull @Override - public long getTimestamp() { - return timestamp; + public StreamRecord getInputElement() { + return inputRecord; } @Override - public Collection get() throws Exception { - return resultFuture.get(); - } + public void emitResult(TimestampedCollector output) { + Preconditions.checkState(completed != null, "Not done yet"); - @Override - protected CompletableFuture> getFuture() { - return resultFuture; + output.setTimestamp(this.inputRecord); Review comment: Remove `this` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327961954 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ## @@ -21,65 +21,55 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The * async function completes this class with a collection of results. * - * @param Type of the asynchronous collection result + * @param Type of the asynchronous collection result. */ @Internal -public class StreamRecordQueueEntry extends StreamElementQueueEntry> - implements AsyncCollectionResult, ResultFuture { - - /** Timestamp information. */ - private final boolean hasTimestamp; - private final long timestamp; - - /** Future containing the collection result. */ - private final CompletableFuture> resultFuture; +class StreamRecordQueueEntry implements StreamElementQueueEntry { + private Collection completed; Review comment: It is better to give a more descriptive naming for `completed` for better understanding in following relevant processes. `completedOutputs/completedResults`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327958374 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java ## @@ -21,65 +21,55 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The * async function completes this class with a collection of results. * - * @param Type of the asynchronous collection result + * @param Type of the asynchronous collection result. */ @Internal -public class StreamRecordQueueEntry extends StreamElementQueueEntry> - implements AsyncCollectionResult, ResultFuture { - - /** Timestamp information. */ - private final boolean hasTimestamp; - private final long timestamp; - - /** Future containing the collection result. */ - private final CompletableFuture> resultFuture; +class StreamRecordQueueEntry implements StreamElementQueueEntry { + private Collection completed; - public StreamRecordQueueEntry(StreamRecord streamRecord) { - super(streamRecord); + @Nonnull + private final StreamRecord inputRecord; - hasTimestamp = streamRecord.hasTimestamp(); - timestamp = streamRecord.getTimestamp(); - - resultFuture = new CompletableFuture<>(); + StreamRecordQueueEntry(@Nonnull StreamRecord inputRecord) { Review comment: We could remove `@Nonnull` because the below `Preconditions.checkNotNull` has the same effect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327957126 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java ## @@ -19,79 +19,47 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.util.Preconditions; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; +import javax.annotation.Nonnull; /** - * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the - * {@link StreamElement} for which the stream element queue entry has been instantiated. - * Furthermore, it allows to register callbacks for when the queue entry is completed. - * - * @param Type of the result + * An entry for the {@link StreamElementQueue}. The stream element queue entry stores the {@link StreamElement} for + * which the stream element queue entry has been instantiated. + * Furthermore, it allows to set the result of a completed entry through {@link ResultFuture}. */ @Internal -public abstract class StreamElementQueueEntry implements AsyncResult { - - private final StreamElement streamElement; - - public StreamElementQueueEntry(StreamElement streamElement) { - this.streamElement = Preconditions.checkNotNull(streamElement); - } - - public StreamElement getStreamElement() { - return streamElement; - } +interface StreamElementQueueEntry extends ResultFuture { /** * True if the stream element queue entry has been completed; otherwise false. * * @return True if the stream element queue entry has been completed; otherwise false. */ - public boolean isDone() { - return getFuture().isDone(); - } + boolean isDone(); /** -* Register the given complete function to be called once this queue entry has been completed. +* Emits the results associated with this queue entry. * -* @param completeFunction to call when the queue entry has been completed -* @param executor to run the complete function +* @param output the output into which to emit. +* @throws IllegalStateException if this entry is not done yet. Review comment: We can not see this exception explicitly, so might not need this javadoc This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954961 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). Review comment: ditto: `` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327955120 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). +* +* @param output the output into which to emit */ - AsyncResult peekBlockingly() throws InterruptedException; + void emitCompleted(TimestampedCollector output); /** -* Poll the first completed {@link AsyncResult} from the head of this queue. This operation is -* blocking and only returns once a completed async result has been found. +* Checks if there is at least one completed element to be emitted. * -* @return Completed {@link AsyncResult} which has been removed from the queue -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* @return true if there is an element to be emitted. */ - AsyncResult poll() throws InterruptedException; + boolean hasCompleted(); /** -* Return the collection of {@link StreamElementQueueEntry} currently contained in this queue. +* Returns the collection of {@link StreamElement} currently contained in this queue for checkpointing. +* +* This includes all non-emitted, completed and non-completed elements. Review comment: ditto: `` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards,
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327955101 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). +* +* @param output the output into which to emit */ - AsyncResult peekBlockingly() throws InterruptedException; + void emitCompleted(TimestampedCollector output); /** -* Poll the first completed {@link AsyncResult} from the head of this queue. This operation is -* blocking and only returns once a completed async result has been found. +* Checks if there is at least one completed element to be emitted. * -* @return Completed {@link AsyncResult} which has been removed from the queue -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* @return true if there is an element to be emitted. Review comment: `true if there is a completed element.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327955120 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). +* +* @param output the output into which to emit */ - AsyncResult peekBlockingly() throws InterruptedException; + void emitCompleted(TimestampedCollector output); /** -* Poll the first completed {@link AsyncResult} from the head of this queue. This operation is -* blocking and only returns once a completed async result has been found. +* Checks if there is at least one completed element to be emitted. * -* @return Completed {@link AsyncResult} which has been removed from the queue -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* @return true if there is an element to be emitted. */ - AsyncResult poll() throws InterruptedException; + boolean hasCompleted(); /** -* Return the collection of {@link StreamElementQueueEntry} currently contained in this queue. +* Returns the collection of {@link StreamElement} currently contained in this queue for checkpointing. +* +* This includes all non-emitted, completed and non-completed elements. Review comment: ditto: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954988 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). +* +* @param output the output into which to emit */ - AsyncResult peekBlockingly() throws InterruptedException; + void emitCompleted(TimestampedCollector output); /** -* Poll the first completed {@link AsyncResult} from the head of this queue. This operation is -* blocking and only returns once a completed async result has been found. +* Checks if there is at least one completed element to be emitted. Review comment: I think it is better to remove "to be emitted". `hasCompleted()` should not know/care of the purpose of upper caller whether to emit or not. In other words, if `hasCompleted()` is defined only used for `emitCompleted`, we could judge it implicitly inside `emitCompleted`, no need to define an explicit interface method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954961 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. * -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value -* @return True if the entry could be inserted; otherwise false -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element +* @param streamElement the element to be inserted. +* @return A handle to the element if successful or {@link Optional#empty()} otherwise. */ -boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; + Optional> tryPut(StreamElement streamElement); /** -* Peek at the head of the queue and return the first completed {@link AsyncResult}. This -* operation is a blocking operation and only returns once a completed async result has been -* found. +* Emits one completed element from the head of this queue into the given output. * -* @return Completed {@link AsyncResult} -* @throws InterruptedException if the current thread has been interrupted while waiting for a -* completed async result. +* Will not emit any element if no element has been completed (check {@link #hasCompleted()} before entering +* any critical section). Review comment: ditto: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954939 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. +* This method returns a handle to the inserted element that allows to set the result of the computation. Review comment: I guess we do not need the tail before in code formatting rule. Anything changed now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954620 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -19,117 +19,66 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Queue; /** - * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue emits + * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue provides * asynchronous results in the order in which the {@link StreamElementQueueEntry} have been added * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly * follows the insertion order (element cannot overtake each other). */ @Internal -public class OrderedStreamElementQueue implements StreamElementQueue { +public final class OrderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onCompletion callback. */ - private final Executor executor; - - /** Operator actions to signal a failure to the operator. */ - private final OperatorActions operatorActions; - - /** Lock and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition headIsCompleted; - /** Queue for the inserted StreamElementQueueEntries. */ - private final ArrayDeque> queue; - - public OrderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private final Queue> queue; + public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.lock = new ReentrantLock(false); - this.headIsCompleted = lock.newCondition(); - this.notFull = lock.newCondition(); + this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } @Override - public AsyncResult peekBlockingly() throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.isEmpty() || !queue.peek().isDone()) { - headIsCompleted.await(); - } - - LOG.debug("Peeked head element from ordered stream element queue with filling degree " + - "({}/{}).", queue.size(), capacity); - - return queue.peek(); - } finally { - lock.unlock(); - } + public boolean hasCompleted() { + return !queue.isEmpty() && queue.peek().isDone(); } @Override - public AsyncResult poll() throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.isEmpty() || !queue.peek().isDone()) { - headIsCompleted.await(); - } - - notFull.signalAll(); - - LOG.debug("Polled head element from ordered stream element queue. New filling degree " + - "({}/{}).", queue.size() - 1, capacity); - - return queue.poll(); - } finally { - lock.unlock(); + public
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954701 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -143,88 +92,31 @@ public int size() { } @Override - public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); - addEntry(streamElementQueueEntry); - } finally { - lock.unlock(); - } - } + queue.add(queueEntry); - @Override - public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (queue.size() < capacity) { - addEntry(streamElementQueueEntry); - - LOG.debug("Put element into ordered stream element queue. New filling degree " + - "({}/{}).", queue.size(), capacity); + LOG.debug("Put element into ordered stream element queue. New filling degree " + + "({}/{}).", queue.size(), capacity); - return true; - } else { - LOG.debug("Failed to put element into ordered stream element queue because it " + - "was full ({}/{}).", queue.size(), capacity); + return Optional.of(queueEntry); + } else { + LOG.debug("Failed to put element into ordered stream element queue because it " + + "was full ({}/{}).", queue.size(), capacity); - return false; - } - } finally { - lock.unlock(); + return Optional.empty(); } } - /** -* Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method -* registers a onComplete callback which is triggered once the given queue entry is completed. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); - - queue.addLast(streamElementQueueEntry); - - streamElementQueueEntry.onComplete( - (StreamElementQueueEntry value) -> { - try { - onCompleteHandler(value); - } catch (InterruptedException e) { - // we got interrupted. This indicates a shutdown of the executor - LOG.debug("AsyncBufferEntry could not be properly completed because the " + - "executor thread has been interrupted.", e); - } catch (Throwable t) { - operatorActions.failOperator(new Exception("Could not complete the " + - "stream element queue entry: " + value + '.', t)); - } - }, - executor); - } - - /** -* Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the -* case, then notify the consumer thread about a new consumable entry. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread is interrupted -*/ - private void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (!queue.isEmpty() && queue.peek().isDone()) { - LOG.debug("Signal ordered stream element queue has completed head element."); - headIsCompleted.signalAll(); - } - } finally { -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954599 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -19,117 +19,66 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Queue; /** - * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue emits + * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue provides * asynchronous results in the order in which the {@link StreamElementQueueEntry} have been added * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly * follows the insertion order (element cannot overtake each other). */ @Internal -public class OrderedStreamElementQueue implements StreamElementQueue { +public final class OrderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; - /** Executor to run the onCompletion callback. */ - private final Executor executor; - - /** Operator actions to signal a failure to the operator. */ - private final OperatorActions operatorActions; - - /** Lock and conditions for the blocking queue. */ - private final ReentrantLock lock; - private final Condition notFull; - private final Condition headIsCompleted; - /** Queue for the inserted StreamElementQueueEntries. */ - private final ArrayDeque> queue; - - public OrderedStreamElementQueue( - int capacity, - Executor executor, - OperatorActions operatorActions) { + private final Queue> queue; + public OrderedStreamElementQueue(int capacity) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); - this.capacity = capacity; - - this.executor = Preconditions.checkNotNull(executor, "executor"); - - this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); - - this.lock = new ReentrantLock(false); - this.headIsCompleted = lock.newCondition(); - this.notFull = lock.newCondition(); + this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } @Override - public AsyncResult peekBlockingly() throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.isEmpty() || !queue.peek().isDone()) { - headIsCompleted.await(); - } - - LOG.debug("Peeked head element from ordered stream element queue with filling degree " + - "({}/{}).", queue.size(), capacity); - - return queue.peek(); - } finally { - lock.unlock(); - } + public boolean hasCompleted() { + return !queue.isEmpty() && queue.peek().isDone(); } @Override - public AsyncResult poll() throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.isEmpty() || !queue.peek().isDone()) { - headIsCompleted.await(); - } - - notFull.signalAll(); - - LOG.debug("Polled head element from ordered stream element queue. New filling degree " + - "({}/{}).", queue.size() - 1, capacity); - - return queue.poll(); - } finally { - lock.unlock(); + public
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954679 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -143,88 +92,31 @@ public int size() { } @Override - public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); - addEntry(streamElementQueueEntry); - } finally { - lock.unlock(); - } - } + queue.add(queueEntry); - @Override - public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (queue.size() < capacity) { - addEntry(streamElementQueueEntry); - - LOG.debug("Put element into ordered stream element queue. New filling degree " + - "({}/{}).", queue.size(), capacity); + LOG.debug("Put element into ordered stream element queue. New filling degree " + + "({}/{}).", queue.size(), capacity); - return true; - } else { - LOG.debug("Failed to put element into ordered stream element queue because it " + - "was full ({}/{}).", queue.size(), capacity); + return Optional.of(queueEntry); + } else { + LOG.debug("Failed to put element into ordered stream element queue because it " + + "was full ({}/{}).", queue.size(), capacity); - return false; - } - } finally { - lock.unlock(); + return Optional.empty(); } } - /** -* Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method -* registers a onComplete callback which is triggered once the given queue entry is completed. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); - - queue.addLast(streamElementQueueEntry); - - streamElementQueueEntry.onComplete( - (StreamElementQueueEntry value) -> { - try { - onCompleteHandler(value); - } catch (InterruptedException e) { - // we got interrupted. This indicates a shutdown of the executor - LOG.debug("AsyncBufferEntry could not be properly completed because the " + - "executor thread has been interrupted.", e); - } catch (Throwable t) { - operatorActions.failOperator(new Exception("Could not complete the " + - "stream element queue entry: " + value + '.', t)); - } - }, - executor); - } - - /** -* Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the -* case, then notify the consumer thread about a new consumable entry. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread is interrupted -*/ - private void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (!queue.isEmpty() && queue.peek().isDone()) { - LOG.debug("Signal ordered stream element queue has completed head element."); - headIsCompleted.signalAll(); - } - } finally { -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954701 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -143,88 +92,31 @@ public int size() { } @Override - public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); - addEntry(streamElementQueueEntry); - } finally { - lock.unlock(); - } - } + queue.add(queueEntry); - @Override - public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (queue.size() < capacity) { - addEntry(streamElementQueueEntry); - - LOG.debug("Put element into ordered stream element queue. New filling degree " + - "({}/{}).", queue.size(), capacity); + LOG.debug("Put element into ordered stream element queue. New filling degree " + + "({}/{}).", queue.size(), capacity); - return true; - } else { - LOG.debug("Failed to put element into ordered stream element queue because it " + - "was full ({}/{}).", queue.size(), capacity); + return Optional.of(queueEntry); + } else { + LOG.debug("Failed to put element into ordered stream element queue because it " + + "was full ({}/{}).", queue.size(), capacity); - return false; - } - } finally { - lock.unlock(); + return Optional.empty(); } } - /** -* Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method -* registers a onComplete callback which is triggered once the given queue entry is completed. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); - - queue.addLast(streamElementQueueEntry); - - streamElementQueueEntry.onComplete( - (StreamElementQueueEntry value) -> { - try { - onCompleteHandler(value); - } catch (InterruptedException e) { - // we got interrupted. This indicates a shutdown of the executor - LOG.debug("AsyncBufferEntry could not be properly completed because the " + - "executor thread has been interrupted.", e); - } catch (Throwable t) { - operatorActions.failOperator(new Exception("Could not complete the " + - "stream element queue entry: " + value + '.', t)); - } - }, - executor); - } - - /** -* Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the -* case, then notify the consumer thread about a new consumable entry. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread is interrupted -*/ - private void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (!queue.isEmpty() && queue.peek().isDone()) { - LOG.debug("Signal ordered stream element queue has completed head element."); - headIsCompleted.signalAll(); - } - } finally { -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954911 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,56 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if Review comment: Try -> Tries This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954679 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -143,88 +92,31 @@ public int size() { } @Override - public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); - addEntry(streamElementQueueEntry); - } finally { - lock.unlock(); - } - } + queue.add(queueEntry); - @Override - public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (queue.size() < capacity) { - addEntry(streamElementQueueEntry); - - LOG.debug("Put element into ordered stream element queue. New filling degree " + - "({}/{}).", queue.size(), capacity); + LOG.debug("Put element into ordered stream element queue. New filling degree " + + "({}/{}).", queue.size(), capacity); - return true; - } else { - LOG.debug("Failed to put element into ordered stream element queue because it " + - "was full ({}/{}).", queue.size(), capacity); + return Optional.of(queueEntry); + } else { + LOG.debug("Failed to put element into ordered stream element queue because it " + + "was full ({}/{}).", queue.size(), capacity); - return false; - } - } finally { - lock.unlock(); + return Optional.empty(); } } - /** -* Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method -* registers a onComplete callback which is triggered once the given queue entry is completed. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the stream element queue entry's result -*/ - private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { - assert(lock.isHeldByCurrentThread()); - - queue.addLast(streamElementQueueEntry); - - streamElementQueueEntry.onComplete( - (StreamElementQueueEntry value) -> { - try { - onCompleteHandler(value); - } catch (InterruptedException e) { - // we got interrupted. This indicates a shutdown of the executor - LOG.debug("AsyncBufferEntry could not be properly completed because the " + - "executor thread has been interrupted.", e); - } catch (Throwable t) { - operatorActions.failOperator(new Exception("Could not complete the " + - "stream element queue entry: " + value + '.', t)); - } - }, - executor); - } - - /** -* Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the -* case, then notify the consumer thread about a new consumable entry. -* -* @param streamElementQueueEntry which has been completed -* @throws InterruptedException if the current thread is interrupted -*/ - private void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - if (!queue.isEmpty() && queue.peek().isDone()) { - LOG.debug("Signal ordered stream element queue has completed head element."); - headIsCompleted.signalAll(); - } - } finally { -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327954640 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java ## @@ -143,88 +92,31 @@ public int size() { } @Override - public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); - - try { - while (queue.size() >= capacity) { - notFull.await(); - } + public Optional> tryPut(StreamElement streamElement) { + if (queue.size() < capacity) { + StreamElementQueueEntry queueEntry = createEntry(streamElement); Review comment: `StreamElementQueueEntry queueEntry` to avoid unchecked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327552339 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java ## @@ -19,68 +19,54 @@ package org.apache.flink.streaming.api.operators.async.queue; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import java.util.Collection; +import java.util.List; +import java.util.Optional; /** - * Interface for blocking stream element queues for the {@link AsyncWaitOperator}. + * Interface for a non-blocking stream element queues for the {@link AsyncWaitOperator}. */ @Internal -public interface StreamElementQueue { +public interface StreamElementQueue { /** -* Put the given element in the queue if capacity is left. If not, then block until this is -* the case. +* Try to put the given element in the queue. This operation succeeds if the queue has capacity left and fails if +* the queue is full. * -* @param streamElementQueueEntry to be put into the queue -* @param Type of the entries future value -* @throws InterruptedException if the calling thread has been interrupted while waiting to -* insert the given element -*/ -void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; - - /** -* Try to put the given element in the queue. This operation succeeds if the queue has capacity -* left and fails if the queue is full. -* -* @param streamElementQueueEntry to be inserted -* @param Type of the entries future value +* @param streamElement to be inserted * @return True if the entry could be inserted; otherwise false Review comment: We need to adjust the return javadoc because it is not boolean value now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551382 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) { } @Override - public void processElement(StreamRecord element) throws Exception { - final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); + public void processElement(final StreamRecord element) throws Exception { + // add element first to the queue + final ResultFuture entry = addToWorkQueue(element); + + final ResultHandler resultHandler = new ResultHandler(element, entry); + // register a timeout for the entry if timeout is configured if (timeout > 0L) { - // register a timeout for this AsyncStreamRecordBufferEntry - long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - - final ScheduledFuture timerFuture = getProcessingTimeService().registerTimer( - timeoutTimestamp, - new ProcessingTimeCallback() { - @Override - public void onProcessingTime(long timestamp) throws Exception { - userFunction.timeout(element.getValue(), streamRecordBufferEntry); - } - }); - - // Cancel the timer once we've completed the stream record buffer entry. This will remove - // the register trigger task - streamRecordBufferEntry.onComplete( - (StreamElementQueueEntry> value) -> { - timerFuture.cancel(true); - }, - executor); - } + final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - addAsyncBufferEntry(streamRecordBufferEntry); + final ScheduledFuture timeoutTimer = getProcessingTimeService().registerTimer( + timeoutTimestamp, + timestamp -> userFunction.timeout(element.getValue(), resultHandler)); - userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); + resultHandler.setTimeoutTimer(timeoutTimer); Review comment: I have potential worries of setting timer delay here. I mean if above `registerTimer` happens before this setting, and during `ResultHandler#complete` if `mailboxExecutor.execute` also happens immediately, then the un-set timers would not be canceled any more. I know it would not happen atm because `mailboxExecutor.execute` just inserts into queue and would not execute right now. Maybe my worry could be ignored. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551424 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -209,41 +176,31 @@ else if (element.isLatencyMarker()) { } @Override - public void processElement(StreamRecord element) throws Exception { - final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); + public void processElement(final StreamRecord element) throws Exception { + // add element first to the queue + final ResultFuture entry = addToWorkQueue(element); + + final ResultHandler resultHandler = new ResultHandler(element, entry); + // register a timeout for the entry if timeout is configured if (timeout > 0L) { - // register a timeout for this AsyncStreamRecordBufferEntry - long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - - final ScheduledFuture timerFuture = getProcessingTimeService().registerTimer( - timeoutTimestamp, - new ProcessingTimeCallback() { - @Override - public void onProcessingTime(long timestamp) throws Exception { - userFunction.timeout(element.getValue(), streamRecordBufferEntry); - } - }); - - // Cancel the timer once we've completed the stream record buffer entry. This will remove - // the register trigger task - streamRecordBufferEntry.onComplete( - (StreamElementQueueEntry> value) -> { - timerFuture.cancel(true); - }, - executor); - } + final long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - addAsyncBufferEntry(streamRecordBufferEntry); + final ScheduledFuture timeoutTimer = getProcessingTimeService().registerTimer( + timeoutTimestamp, + timestamp -> userFunction.timeout(element.getValue(), resultHandler)); - userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); + resultHandler.setTimeoutTimer(timeoutTimer); + } + + userFunction.asyncInvoke(element.getValue(), resultHandler); } @Override public void processWatermark(Watermark mark) throws Exception { - WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark); + addToWorkQueue(mark); - addAsyncBufferEntry(watermarkBufferEntry); + outputCompletedElements(); Review comment: Sorry for touching this issue again. I regard it as a separate improvement out of this pr scope, but I could also accept it if you wish. I am not quite sure whether it is worth verifying the effect of this improvement. Maybe not very necessary. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551484 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -276,10 +232,10 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); + Review comment: nit: unnecessary change. If we want to reformat the codes it is better for a separate hotfix commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551509 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -276,10 +232,10 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); + recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); - Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551549 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +249,113 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); Review comment: How to guarantee that `yield()` could consume the elements from `queue` finally? In previous way the `EmitterThread` could consume the queue, but now I did not see any runnable to be submitted into mailbox for consuming the queue. I know `outputCompletedElements()` could empty the queue, but it can not happen before calling `userFunction.asyncInvoke(element.getValue(), resultHandler)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551609 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -293,140 +249,113 @@ public void close() throws Exception { waitInFlightInputsFinished(); } finally { - Exception exception = null; - - try { - super.close(); - } catch (InterruptedException interrupted) { - exception = interrupted; - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; - } - - try { - // terminate the emitter, the emitter thread and the executor - stopResources(true); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); - - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - if (exception != null) { - LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); - } + super.close(); } } - @Override - public void dispose() throws Exception { - Exception exception = null; + /** +* Add the given stream element to the operator's stream element queue. This operation blocks until the element +* has been added. +* +* Between two insertion attempts, this method yields the execution to the mailbox, such that events as well +* as asynchronous results can be processed. +* +* @param streamElement to add to the operator's queue +* @throws InterruptedException if the current thread has been interrupted while yielding to mailbox +* @return a handle that allows to set the result of the async computation for the given element. +*/ + private ResultFuture addToWorkQueue(StreamElement streamElement) throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - try { - super.dispose(); - } catch (InterruptedException interrupted) { - exception = interrupted; + pendingStreamElement = streamElement; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = e; + Optional> queueEntry; + while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { + mailboxExecutor.yield(); } - try { - stopResources(false); - } catch (InterruptedException interrupted) { - exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); + pendingStreamElement = null; - Thread.currentThread().interrupt(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return queueEntry.get(); + } + + private void waitInFlightInputsFinished() throws InterruptedException { + assert(Thread.holdsLock(checkpointingLock)); - if (exception != null) { - throw exception; + while (!queue.isEmpty()) { + mailboxExecutor.yield(); } } /** -* Close the operator's resources. They include the emitter thread and the executor to run -* the queue's complete operation. +* Batch output of all completed elements. Watermarks are always completed if it's their turn to be processed. * -* @param waitForShutdown is true if the method should wait for the resources to be freed; -* otherwise false. -* @throws InterruptedException if current thread has been interrupted +* This method will be called from {@link #processWatermark(Watermark)} and from a mail processing the result +* of an async function call. */ - private void stopResources(boolean waitForShutdown) throws InterruptedException { - emitter.stop(); -
[GitHub] [flink] zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator
zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327551336 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -209,41 +176,34 @@ else if (element.isLatencyMarker()) { } @Override - public void processElement(StreamRecord element) throws Exception { - final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); + public void processElement(final StreamRecord element) throws Exception { Review comment: Nit: this change might not be suggested mixing with the core changes. Unless this line was also touched by the core change, then we could add the `final` in passing. From the consistent view, in the following `processWatermark(Watermark mark)` we also have no final argument. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services