Hi KristoffSC,

sorry for the confusing error message. In short, mailbox thread = task
thread.

your operator a) calls collector.collect from a different thread (in which
the CompleteableFuture is completed). However, all APIs must always be used
from the task thread.

The only way to cross thread boundaries is AsyncIO, so a) must also be an
asyncIO.

On Tue, Nov 24, 2020 at 5:53 PM KristoffSC <krzysiek.chmielew...@gmail.com>
wrote:

> Hi,
> I faced an issue on Flink 1.11. It was for now one time thing and I cannot
> reproduce it. However I think something is lurking there...
>
> I cannot post full stack trace and user code however I will try to describe
> the problem.
>
> Setup without any resource groups with only one Operator chain restriction
> mentioned below.
>
> chained task #1 - AsyncOperator with orderedWait calling 3rd party system
> forwards to
> chained task #2 - with:
> a) ProcessFunction A calling multi threaded library. in Process Function we
> do
>
> CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
> b) Process Function B (no multi thread operations)
> c) AsyncOperator with ordered wait calling 3rd party system
> d) process Function
>
> Between task #1 and #2 there is a .startNewChain() so separate those two
> tasks.
>
> During load tests we got:
> Caused by: java.lang.IllegalStateException: Illegal thread detected. This
> method must be called from inside the mailbox thread!
>
> The question is, what it actually means and when it may happen?
>
> The "full" stack trace, from where I had to remove user code:
>
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
>
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
>
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         user---Code---calls
>         at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source)
> [?:?]
>         at java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> [?:?]
>         at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source)
> [?:?]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
>         at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.IllegalStateException: Illegal thread detected. This
> method must be called from inside the mailbox thread!
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         ... 35 more
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to