[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318543002 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -286,13 +334,12 @@ public void setup(TypeSerializer outputSerializer) { createStreamTaskStateManager(environment, stateBackend, processingTimeService); mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer); - if (operator instanceof SetupableStreamOperator) { + if (operator == null) { Review comment: This changes are overwriting lines introduced in the previous commit, so something is not quite right here. Shouldn't some or all of the changes from this commit, that are touching TestHanresses, be squashed/moved to the previous commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318419679 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -148,12 +150,17 @@ public OperatorChain( chainedConfigs, userCodeClassloader, streamOutputMap, - allOps); + allOps, Review comment: nit: formatting (and ditto the same issue in other places in this commit) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318420614 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) { streamTaskStateInitializer = createStreamTaskStateManager(environment, stateBackend, processingTimeService); mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer); + if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup(mockTask, config, new MockOutput(outputSerializer)); } + SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator); Review comment: @AHeise I'm re-opening this issue, as I do not see that it has changed and it also duplicates some code from `StreamOperatorFactoryUtil.createOperator`. Am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318419434 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/execution/MailboxProcessor.java ## @@ -106,33 +109,37 @@ public void open() { * Lifecycle method to close the mailbox for action submission. */ public void prepareClose() { - mailboxExecutor.shutdown(); + mailbox.quiesce(); } /** * Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all instances of * {@link java.util.concurrent.RunnableFuture} that are still contained in the mailbox. */ public void close() { - List droppedLetters = mailboxExecutor.shutdownNow(); + List droppedLetters = mailbox.close(); if (!droppedLetters.isEmpty()) { LOG.debug("Closing the mailbox dropped letters {}.", droppedLetters); FutureUtils.cancelRunnableFutures(droppedLetters); } } + private boolean isMailboxThread() { + return Thread.currentThread() == mailboxThread; + } + /** * Runs the mailbox processing loop. This is where the main work is done. */ public void runMailboxLoop() throws Exception { Preconditions.checkState( - mailboxExecutor.isMailboxThread(), + isMailboxThread(), Review comment: nit: formatting? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318428252 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -150,32 +153,76 @@ public AbstractStreamOperatorTestHarness( int subtaskIndex, OperatorID operatorID) throws Exception { this( - operator, - new MockEnvironmentBuilder() - .setTaskName("MockTask") - .setMemorySize(3 * 1024 * 1024) - .setInputSplitProvider(new MockInputSplitProvider()) - .setBufferSize(1024) - .setMaxParallelism(maxParallelism) - .setParallelism(parallelism) - .setSubtaskIndex(subtaskIndex) - .build(), - true, - operatorID); + operator, Review comment: nit: formatting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318419056 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ## @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() { return returnStream; } + /** +* Method for passing user defined operators along with the type information that will transform the DataStream. +* +* @param operatorName name of the operator, for logging purposes +* @param outTypeInfo the output type of the operator +* @param operatorFactory the factory for the operator. +* @param type of the return stream +* @return the data stream constructed +*/ + @PublicEvolving + public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, + OneInputStreamOperatorFactory operatorFactory) { Review comment: It makes it less convenient for power users, but it's necessary for cleaner API and runtime code. I would mark as deprecated, because I would like to have at least flink built-in operators to move away from this old API in favour of `StreamOperatorFactory` as soon as possible. But ultimately we can merge it with or without `@Deprecated` note and question would be more towards the @aljoscha what does he prefer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317577599 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ## @@ -58,9 +58,10 @@ * Compare the two queues containing operator/task output by converting them to an array first. */ public static void assertOutputEquals(String message, Queue expected, Queue actual) { - Assert.assertArrayEquals(message, - expected.toArray(), - actual.toArray()); + Assert.assertEquals( Review comment: Are those changes related to this commit? If not, could you pull them to a separate commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317581559 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -162,12 +166,20 @@ public void setup(StreamTask containingTask, StreamConfig config, Output(checkpointingLock, output, queue, this); + this.emitter = new Emitter<>(checkpointingLock, Review comment: nit: I think the recent consensus about wrapping the parameters was that the first parameter should be also in the new line: https://github.com/apache/flink-web/pull/254 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317625597 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ## @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() { return returnStream; } + /** +* Method for passing user defined operators along with the type information that will transform the DataStream. +* +* @param operatorName name of the operator, for logging purposes +* @param outTypeInfo the output type of the operator +* @param operatorFactory the factory for the operator. +* @param type of the return stream +* @return the data stream constructed +*/ + @PublicEvolving + public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, Review comment: nit: formatting (each parameter in a separate line) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317573452 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java ## @@ -72,6 +74,10 @@ protected void init() throws Exception { // re-initialize the operator with the correct collector. StreamOperatorFactory operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader()); headOperator = operatorFactory.createStreamOperator(this, configuration, new CollectorWrapper<>(collector)); + if (operatorFactory instanceof YieldingOperatorFactory) { Review comment: Should we deduplicate this if check? It's present in 4 different places. Maybe as a helper static method in `YieldingOperatorFactory`? edit: Also I would deduplicate this logic including `createStreamOperator(...)` call, as now in four different places (and in all of the future ones) we must make sure that `setMailboxExecutor(...)` has been correctly set before creating `StreamOperator`. This place here for example does this after the creation. Another place below looks like is not creating the operator at all. So I would extract this logic to some utility class, as a static method to `StreamOperatorFactoryUtil`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317626081 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java ## @@ -17,22 +17,68 @@ package org.apache.flink.streaming.api.operators.async; -import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; /** * The factory of {@link AsyncWaitOperator}. * * @param The output type of the operator */ -public class AsyncWaitOperatorFactory extends SimpleUdfStreamOperatorFactory implements YieldingOperatorFactory { - public AsyncWaitOperatorFactory(AsyncWaitOperator operator) { - super(operator); +public class AsyncWaitOperatorFactory implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + private final AsyncFunction asyncFunction; + private final long timeout; + private final int capacity; + private final AsyncDataStream.OutputMode outputMode; + private MailboxExecutor mailboxExecutor; + private ChainingStrategy strategy = ChainingStrategy.HEAD; + + public AsyncWaitOperatorFactory(AsyncFunction asyncFunction, + long timeout, + int capacity, + AsyncDataStream.OutputMode outputMode) { + this.asyncFunction = asyncFunction; + this.timeout = timeout; + this.capacity = capacity; + this.outputMode = outputMode; } @Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { - AsyncWaitOperator operator = (AsyncWaitOperator) getOperator(); - operator.setMailboxExecutor(mailboxExecutor); + this.mailboxExecutor = mailboxExecutor; + } + + @Override public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, + Output output) { + AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(asyncFunction, + timeout, + capacity, + outputMode, + mailboxExecutor); Review comment: `checkNotNull(mailboxExecutor)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317627543 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -170,13 +170,10 @@ public void run() { } /** -* A special {@link AsyncFunction} without issuing -* {@link ResultFuture#complete} until the latch counts to zero. -* {@link ResultFuture#complete} until the latch counts to zero. -* This function is used in the testStateSnapshotAndRestore, ensuring -* that {@link StreamElementQueueEntry} can stay -* in the {@link StreamElementQueue} to be -* snapshotted while checkpointing. +* A special {@link AsyncFunction} without issuing {@link ResultFuture#complete} until the latch counts to zero. Review comment: Are the changes in this file relevant to the commit? They look like some auto formatting issue? Also it looks like some of those "auto formatted" lines are not following our code style? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317574128 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators needs to be created + * through a factory implementing this interface. + */ +public interface YieldingOperatorFactory { Review comment: shouldn't this extend from `StreamOperatorFactory`? I know it doesn't matter from the code perspective, but it might be more self documenting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317578748 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ## @@ -423,18 +434,25 @@ public boolean hasSelectiveReadingOperator() { ClassLoader userCodeClassloader, Map> streamOutputs, List> allOperators, - OutputTag outputTag) { + OutputTag outputTag, + IntFunction mailboxExecutorFactory) { Review comment: Instead of `IntFunction` maybe define a simple `MailboxFactory` class that has a nicely named method with a named parameter? As it is now readability heavily depends on the usage, and in the signature of this constructor it's not obvious what this `IntFunction` should accept. Actually even usage here is not obvious: `mailboxExecutorFactory.apply(operatorConfig.getChainIndex());`, since we think about this parameter as a "priority" and call it that way for the sake of yielding. Another issue, is that looking for usages with such interfaces is a bit problematic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317628154 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -165,18 +169,61 @@ public AbstractStreamOperatorTestHarness( operatorID); } + public AbstractStreamOperatorTestHarness( + StreamOperatorFactory factory, + MockEnvironment env) throws Exception { + this(null, factory, env, false, new OperatorID()); + } + + public AbstractStreamOperatorTestHarness( + StreamOperatorFactory factory, + int maxParallelism, + int parallelism, + int subtaskIndex) throws Exception { + this( + factory, Review comment: nit: something doesn't look right with this formatting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317575004 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) { streamTaskStateInitializer = createStreamTaskStateManager(environment, stateBackend, processingTimeService); mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer); + if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup(mockTask, config, new MockOutput(outputSerializer)); } + SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator); Review comment: I'm a bit confused here. How should it work? `factory. createStreamOperator(...)` is not being called here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315072874 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: Couldn't we blockingly wait while yielding? In other words, replacing `tryYield` with `yield`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315066822 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: Aren't we busy waiting here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314654753 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { + yieldToDownstream(); + // wait for the emitter thread to output the remaining elements // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + checkpointingLock.wait(1); Review comment: It the Stefan R's PR, there was no need for this `wait(1)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314654604 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws Exception { MockEnvironment environment = createMockEnvironment(); StreamTask containingTask = mock(StreamTask.class); + TaskMailboxImpl mailbox = new TaskMailboxImpl(); when(containingTask.getEnvironment()).thenReturn(environment); when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); + when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new MailboxExecutorImpl(mailbox)); Review comment: Dropping mockito here should be easy: ``` +import org.apache.flink.streaming.util.MockStreamTask; +import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.ExceptionUtils; @@ -770,10 +772,10 @@ public class AsyncWaitOperatorTest extends TestLogger { MockEnvironment environment = createMockEnvironment(); - StreamTask containingTask = mock(StreamTask.class); - when(containingTask.getEnvironment()).thenReturn(environment); - when(containingTask.getCheckpointLock()).thenReturn(lock); - when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); + MockStreamTask containingTask = new MockStreamTaskBuilder(environment) + .setCheckpointLock(lock) + .setProcessingTimeService(new TestProcessingTimeService()) + .build(); StreamConfig streamConfig = new MockStreamConfig(); streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE); ``` and adding a new setter and accessor for `getMailboxExecutor(int ignored) -> mailbox` If it's indeed that simple I would prefer it as `hotfix` commit in this PR. Preferably as a base commit, before your changes, so that there is no intermittent code that will be removed in a commit after. But if you do not like the rebase hassle, I would be also ok in a `hotifx` commit on top of your change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314654570 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output +* buffers to flush. +* If such an operator would indeed block the task thread, deadlocks can arise when two such operators are +* chained. +* It's up to the implementor of the operator to find a good trade-off, between yielding and own processing. +* If the operator never blocks the task thread during input processing, this method should not be called at +* all. +* This method indicates whether downstream events actually have been processed, which can be used to issue +* further invocations. +* +* @return true if any downstream event has been handled. +*/ + protected boolean yieldToDownstream() { + throw new NotImplementedException("API draft"); Review comment: I'm just not sure how to expose it to the users. Instead of `AbstractStreamOperator#yield()` we could have `StreamOperator#yield()`, or `StreamTask#yield()`. For the last one, `StreamTask` is accessible via `SetupableStreamOperator#setup()` and `StreamOperatorFactory#createStreamOperator()` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314316038 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { + yieldToDownstream(); + // wait for the emitter thread to output the remaining elements // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + checkpointingLock.wait(1); Review comment: why `.wait(1)`? I would be expecting more or less the same code as Stefan R proposed in his version, but encapsulated/hidden from the user in `yieldToDownstream()`. Check my comment here: https://github.com/apache/flink/pull/9426#discussion_r314195257 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314312315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output +* buffers to flush. +* If such an operator would indeed block the task thread, deadlocks can arise when two such operators are +* chained. +* It's up to the implementor of the operator to find a good trade-off, between yielding and own processing. +* If the operator never blocks the task thread during input processing, this method should not be called at +* all. +* This method indicates whether downstream events actually have been processed, which can be used to issue +* further invocations. +* +* @return true if any downstream event has been handled. +*/ + protected boolean yieldToDownstream() { + throw new NotImplementedException("API draft"); Review comment: nit: `UnsupportedOperationException` instead of apache commons Also the question is, how to we want to expose `yieldToDownstream`? Via `AbstractStreamOperator`? Maybe `StreamOperator`? Maybe we should allow access to it via a `StreamTask containingTask` passed in `StreamOperatorFactory#createStreamOperator` and `SetupableStreamOperator#setup`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314330128 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -84,10 +89,14 @@ public boolean hasMail() { @Override public Optional tryTakeMail() throws MailboxStateException { + return downstreamMailbox.tryTakeMail(); + } + + private Optional tryTakeDownstreamMail(int operatorIndex) throws MailboxStateException { Review comment: I was a bit confused about the class interactions here and the call chain is a bit complicated (`TaskMailboxImpl #tryTakeMail` -> goes to downstreamMailbox `downstreamMailbox.tryTakeMail()` -> comes back here via `TaskMailboxImpl#tryTakeDownstreamMail`). Would it be simpler if dropped the requirement of `TaskMailbox` and `Mailbox` having the same interface? On the most upper level I would actually prefer having to explicitly pass `operatorIndex` when calling `putMail`, to make this more explicit with less magic. It would be more readable for me to have this lower level api explicit, instead of figuring out what are the `putMail(mail)` or `tryTakeMail()` doing. `putMail(mail, priority)` or `tryTakeMail(priority)` is more self explanatory interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314331799 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java ## @@ -123,70 +123,76 @@ public void testConcurrentPutTakeNonBlockingAndWait() throws Exception { */ @Test public void testCloseUnblocks() throws InterruptedException { - testAllPuttingUnblocksInternal(Mailbox::close); + testAllPuttingUnblocksInternal(TaskMailbox::close); setUp(); - testUnblocksInternal(() -> mailbox.takeMail(), Mailbox::close); + testUnblocksInternal(() -> taskMailbox.takeMail(), TaskMailbox::close); } /** * Test that silencing the mailbox unblocks pending accesses with correct exceptions. */ @Test public void testQuiesceUnblocks() throws Exception { - testAllPuttingUnblocksInternal(Mailbox::quiesce); + testAllPuttingUnblocksInternal(TaskMailbox::quiesce); } @Test public void testLifeCycleQuiesce() throws Exception { - mailbox.putMail(() -> {}); - mailbox.putMail(() -> {}); - mailbox.quiesce(); + taskMailbox.putMail(() -> { + }); Review comment: optional nit: remove new line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314330433 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java ## @@ -33,4 +33,13 @@ * @throws MailboxStateException if the mailbox is quiesced or closed. */ void putMail(@Nonnull Runnable letter) throws MailboxStateException; + + /** +* Adds the given action to the head of the mailbox. +* +* @param priorityLetter action to enqueue to the head of the mailbox. +* @throws MailboxStateException if the mailbox is quiesced or closed. +*/ + void putFirst(@Nonnull Runnable priorityLetter) throws MailboxStateException; + Review comment: nit: unnecessary new line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314335073 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java ## @@ -256,11 +262,54 @@ private void testPutTake( writerThread.join(); } - mailbox.putMail(POISON_LETTER); + taskMailbox.putMail(POISON_LETTER); readerThread.join(); for (int perThreadResult : results) { Assert.assertEquals(numLettersPerThread, perThreadResult); } } + + /** +* Tests the downstream view of {@link TaskMailbox}. +*/ + public static class DownstreamMailboxTest { + /** +* Object under test. +*/ + private TaskMailboxImpl taskMailbox; + + @Before + public void setUp() { + taskMailbox = new TaskMailboxImpl(); + taskMailbox.open(); + } + + @After + public void tearDown() { + taskMailbox.close(); + } + + @Test + public void testPutAsHead() throws Exception { + + Runnable instanceA = () -> {}; + Runnable instanceB = () -> {}; + Runnable instanceC = () -> {}; + Runnable instanceD = () -> {}; + + taskMailbox.getDownstreamMailbox(1).putMail(instanceC); + taskMailbox.getDownstreamMailbox(2).putMail(instanceB); + taskMailbox.getDownstreamMailbox(1).putMail(instanceD); + taskMailbox.getDownstreamMailbox(2).putFirst(instanceA); + + Assert.assertSame(instanceA, taskMailbox.getDownstreamMailbox(2).takeMail()); Review comment: maybe we should add the same test (adding mail via downstream mailboxes), but with consuming from the `taskMailbox` directly (not via downstream mailboxes as this one). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314322494 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -251,4 +280,79 @@ public void quiesce() { public State getState() { return state; } + + @Override + public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) { + return getDownstreamMailbox(operator.getOperatorConfig().getChainIndex()); Review comment: +1 for the @1u0 comment. Dependency on `AbstractStreamOperator` is a very heavy one and will make testing/reusing difficult. I would like to avoid it even with potential future changes if `int priority` proves to be not enough. Keep in mind that we can always refactor this in the future and using now `int priority` will not cause us much (if any) overhead in the future if we need to change it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314315851 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws Exception { MockEnvironment environment = createMockEnvironment(); StreamTask containingTask = mock(StreamTask.class); + TaskMailboxImpl mailbox = new TaskMailboxImpl(); when(containingTask.getEnvironment()).thenReturn(environment); when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); + when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new MailboxExecutorImpl(mailbox)); Review comment: arguably you are making this test worse by adding an extra mockito line here. You could try to replace it with `MockStreamTaskBuilder`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services