[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318611759 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ## @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() { return returnStream; } + /** +* Method for passing user defined operators along with the type information that will transform the DataStream. +* +* @param operatorName name of the operator, for logging purposes +* @param outTypeInfo the output type of the operator +* @param operatorFactory the factory for the operator. +* @param type of the return stream +* @return the data stream constructed +*/ + @PublicEvolving + public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, + OneInputStreamOperatorFactory operatorFactory) { Review comment: Aljoscha is fine with the current state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318435616 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ## @@ -283,13 +287,23 @@ public void setup(TypeSerializer outputSerializer) { streamTaskStateInitializer = createStreamTaskStateManager(environment, stateBackend, processingTimeService); mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer); + if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup(mockTask, config, new MockOutput(outputSerializer)); } + SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator); Review comment: Was outdated but still wasn't using `StreamOperatorFactoryUtil` so I fixed that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318129783 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ## @@ -58,9 +58,10 @@ * Compare the two queues containing operator/task output by converting them to an array first. */ public static void assertOutputEquals(String message, Queue expected, Queue actual) { - Assert.assertArrayEquals(message, - expected.toArray(), - actual.toArray()); + Assert.assertEquals( Review comment: Removed them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r318129693 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java ## @@ -72,6 +74,10 @@ protected void init() throws Exception { // re-initialize the operator with the correct collector. StreamOperatorFactory operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader()); headOperator = operatorFactory.createStreamOperator(this, configuration, new CollectorWrapper<>(collector)); + if (operatorFactory instanceof YieldingOperatorFactory) { Review comment: Went with StreamOperatorFactoryUtil for now. Please check if you have better ideas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317994445 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -27,19 +27,19 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** - * Implementation of {@link Mailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards + * Implementation of {@link TaskMailbox} inspired by {@link java.util.concurrent.ArrayBlockingQueue} and tailored towards Review comment: I changed the comment to a more general reference to blocking queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317991596 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ## @@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() { return returnStream; } + /** +* Method for passing user defined operators along with the type information that will transform the DataStream. +* +* @param operatorName name of the operator, for logging purposes +* @param outTypeInfo the output type of the operator +* @param operatorFactory the factory for the operator. +* @param type of the return stream +* @return the data stream constructed +*/ + @PublicEvolving + public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, + OneInputStreamOperatorFactory operatorFactory) { Review comment: I left it as PublicEvolving but added a comment that the other method is preferred. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317991063 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ## @@ -1172,8 +1173,10 @@ public ExecutionConfig getExecutionConfig() { * @param *type of the return stream * @return the data stream constructed +* @see #transform(String, TypeInformation, OneInputStreamOperatorFactory) */ @PublicEvolving + @Deprecated public SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator) { Review comment: Eliminated duplicate and removed @Deprecated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317971162 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: AsyncWaitOperator commit will be moved to another PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317956306 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java ## @@ -72,6 +74,10 @@ protected void init() throws Exception { // re-initialize the operator with the correct collector. StreamOperatorFactory operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader()); headOperator = operatorFactory.createStreamOperator(this, configuration, new CollectorWrapper<>(collector)); + if (operatorFactory instanceof YieldingOperatorFactory) { Review comment: Yes, we should find a way to avoid duplicates; I'll explore some alternatives to factory util. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317955279 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators needs to be created + * through a factory implementing this interface. + */ +public interface YieldingOperatorFactory { Review comment: Yes, it makes stuff easier to understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r317955614 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor; + +/** + * An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators needs to be created + * through a factory implementing this interface. + */ +public interface YieldingOperatorFactory { Review comment: Not needed code-wise because, we are currently resorting to instanceof checks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315558967 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output Review comment: Okay, I will remove it then. But FYI, HTML5 is as strict as XML, so you need to have a closing tag. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315556403 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java ## @@ -97,22 +102,18 @@ public void run() { } } - private void output(AsyncResult asyncResult) throws InterruptedException { + /** +* Executed as a mail in the mailbox thread. Output needs to be guarded with checkpoint lock (for the time being). +* +* @param asyncResult the result to output. +*/ + private void output(AsyncResult asyncResult) { if (asyncResult.isWatermark()) { - synchronized (checkpointLock) { - AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark(); + AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark(); Review comment: Moved poll inside checkpoint lock This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315548601 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -227,11 +233,73 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output Review comment: Since we are slowly adopting newer Java versions, it's quite likely that we also switch to new javadoc with HTML5. I'd prefer to not revert back to HTML4 unless you think it is confusing to devs and users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315546676 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: If we do yield now before migrating processing timers, we can run into deadlocks. Imagine the following: asyncIO is using an external system that is currently not available. The implementation relies on timeouts to occur. Thus, the queue of the AsyncWaitOperator is full with tasks that require the timeout to happen. At that point, we need to release the checkpoint lock instead of yielding (since processor timers are not migrated yet). Thus, we need to alternate between yieldToDownstream and releasing checkpoint lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315172658 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -84,10 +89,14 @@ public boolean hasMail() { @Override public Optional tryTakeMail() throws MailboxStateException { + return downstreamMailbox.tryTakeMail(); + } + + private Optional tryTakeDownstreamMail(int operatorIndex) throws MailboxStateException { Review comment: Dropped mailbox interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315172523 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { + yieldToDownstream(); + // wait for the emitter thread to output the remaining elements // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + checkpointingLock.wait(1); Review comment: Also using the approach of Stefan to enqueue the result in a mailbox. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315172298 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws Exception { MockEnvironment environment = createMockEnvironment(); StreamTask containingTask = mock(StreamTask.class); + TaskMailboxImpl mailbox = new TaskMailboxImpl(); when(containingTask.getEnvironment()).thenReturn(environment); when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); + when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new MailboxExecutorImpl(mailbox)); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315136528 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output +* buffers to flush. +* If such an operator would indeed block the task thread, deadlocks can arise when two such operators are +* chained. +* It's up to the implementor of the operator to find a good trade-off, between yielding and own processing. +* If the operator never blocks the task thread during input processing, this method should not be called at +* all. +* This method indicates whether downstream events actually have been processed, which can be used to issue +* further invocations. +* +* @return true if any downstream event has been handled. +*/ + protected boolean yieldToDownstream() { + throw new NotImplementedException("API draft"); Review comment: Exposing it now through #setup as discussed with Aljoscha. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315134705 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -251,4 +280,79 @@ public void quiesce() { public State getState() { return state; } + + @Override + public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) { + return getDownstreamMailbox(operator.getOperatorConfig().getChainIndex()); Review comment: Using int priority everywhere now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315107092 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: You are right, now that emitter thread always enqueues results that would work fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r315070914 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException * Add the given stream element queue entry to the operator's stream element queue. This * operation blocks until the element has been added. * -* For that it tries to put the element into the queue and if not successful then it waits on -* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output -* elements. The emitter is also responsible for notifying this method if the queue has capacity -* left again, by calling notifyAll on the checkpointing lock. -* * @param streamElementQueueEntry to add to the operator's queue * @param Type of the stream element queue entry's result * @throws InterruptedException if the current thread has been interrupted */ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - - pendingStreamElementQueueEntry = streamElementQueueEntry; - while (!queue.tryPut(streamElementQueueEntry)) { - // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + yieldToDownstream(); } - - pendingStreamElementQueueEntry = null; } - private void waitInFlightInputsFinished() throws InterruptedException { - assert(Thread.holdsLock(checkpointingLock)); - + private void waitInFlightInputsFinished() { while (!queue.isEmpty()) { - // wait for the emitter thread to output the remaining elements - // for that he needs the checkpointing lock and thus we have to free it - checkpointingLock.wait(); + yieldToDownstream(); Review comment: Yes, and we have three options to solve that: * some sleep(1) * add some mutexes and conditions to be notified when new stuff arrives * find a cool completablefuture wrapper around the mutexes and conditions (yieldAsync and putAsync and then wait for the first thing to complete) Which one would you prefer? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314638184 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { + yieldToDownstream(); + // wait for the emitter thread to output the remaining elements // we wait for the emitter to notify us if the queue has space left again - checkpointingLock.wait(); + checkpointingLock.wait(1); Review comment: wait(1) is needed for the emitter thread to do something (working on getting rid of emitter thread in a separate effort). A full wait would ofc not allow us to interleave mailbox messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314637896 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ## @@ -232,6 +233,24 @@ public void setup(StreamTask containingTask, StreamConfig config, OutputThis method should be called whenever an operator would need to block the task thread to wait for output +* buffers to flush. +* If such an operator would indeed block the task thread, deadlocks can arise when two such operators are +* chained. +* It's up to the implementor of the operator to find a good trade-off, between yielding and own processing. +* If the operator never blocks the task thread during input processing, this method should not be called at +* all. +* This method indicates whether downstream events actually have been processed, which can be used to issue +* further invocations. +* +* @return true if any downstream event has been handled. +*/ + protected boolean yieldToDownstream() { + throw new NotImplementedException("API draft"); Review comment: I wanted to expose yieldToDownstream only to operator developers (thus protected). That also means, I cannot add it to the interface. I didn't get the last path? You want to inject it through the factory? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314637375 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ## @@ -771,9 +773,11 @@ public void testClosingWithBlockedEmitter() throws Exception { MockEnvironment environment = createMockEnvironment(); StreamTask containingTask = mock(StreamTask.class); + TaskMailboxImpl mailbox = new TaskMailboxImpl(); when(containingTask.getEnvironment()).thenReturn(environment); when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); + when(containingTask.getTaskMailboxExecutor(any())).thenReturn(new MailboxExecutorImpl(mailbox)); Review comment: Yes, but afaik, it would require quite a bit of refactoring, which I'd do in another PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314300937 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java ## @@ -251,4 +280,79 @@ public void quiesce() { public State getState() { return state; } + + @Override + public Mailbox getDownstreamMailbox(AbstractStreamOperator operator) { + return getDownstreamMailbox(operator.getOperatorConfig().getChainIndex()); Review comment: That's what I had in the first place. However, in respect to a possible branched operator chain (Piotr mentioned that this may come), I wanted to make the priority and its type an implementation detail. Nevertheless, I see your point about the cyclic coupling and would revert back to int priority. Alternatively, we could also take the StreamConfig to avoid the cyclic dependency graph. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314299848 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ## @@ -402,8 +402,10 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { + yieldToDownstream(); Review comment: Yes, I can do that. But would first like to have a general feedback on the design. Btw the commit order has been shuffled by github for some reasons. This is actually the last commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops
AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops URL: https://github.com/apache/flink/pull/9383#discussion_r314298805 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java ## @@ -1,73 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ Review comment: Oh shoot. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services