Hi James,
it is unlikely that your issue is the same as the one Miguel is having.
His one https://issues.apache.org/jira/browse/FLINK-9242 is probably the
same as https://issues.apache.org/jira/browse/FLINK-9144 and happens
only in batch programs spilling data in Flink 1.5 and 1.6 versions
before last Friday.

From the information you provided, I suppose you are running a streaming
job in Flink 1.4, do you? Your example looks like a simpler setup: can
you try to minimise it so that you can share the code and we can have a
look?


Regards
Nico

On 18/04/18 01:59, James Yu wrote:
> Miguel, I and my colleague ran into same problem yesterday.
> We were expecting Flink to get 4 inputs from Kafka and write the inputs
> to Cassandra, but the operators got stuck after the 1st input is written
> into Cassandra.
> This is how DAG looks like:
> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
> After we disable the auto chaining
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups),
> all 4 inputs are read from Kafka and written into Cassandra.
> We are still figuring out why the chaining causes the blocking.
> 
> 
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275
> 
> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com
> <mailto:miguel.e.coim...@gmail.com>>:
> 
>     Chesnay, following your suggestions I got access to the web
>     interface and also took a closer look at the debugging logs.
>     I have noticed one problem regarding the web interface port - it
>     keeps changing port now and then during my Java program's execution.
> 
>     Not sure if that is due to my program launching several job
>     executions sequentially, but the fact is that it happened.
>     Since I am accessing the web interface via tunneling, it becomes
>     rather cumbersome to keep adapting it.
> 
>     Another particular problem I'm noticing is that this exception
>     frequently pops up (debugging with log4j):
> 
>     00:17:54,368 DEBUG
>     org.apache.flink.runtime.jobmaster.slotpool.SlotPool          -
>     Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
>     org.apache.flink.util.FlinkException: Slot is being returned to the
>     SlotPool.
>             at
>     
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>             at
>     
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>             at
>     
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>             at
>     
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
>             at
>     java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
>             at
>     
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
>             at
>     
> org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239)
>             at
>     
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946)
>             at
>     
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588)
>             at
>     
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593)
>             at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>             at
>     
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>             at java.lang.reflect.Method.invoke(Method.java:498)
>             at
>     
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>             at
>     
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>             at
>     
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>             at
>     
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>             at
>     akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>             at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>             at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>             at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>             at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>             at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>             at
>     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>             at
>     
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>             at
>     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>             at
>     
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
>     Don't know if the internals of Flink are explicitly using an
>     exception for control flow, but there are several occurrences of
>     this as time goes by.
> 
>     Regarding my program itself, I've achieved some progress.
>     In my program I need to do a sequence of series of Flink jobs, and
>     need extra care to make sure no DataSet instance from job /i/ is
>     being used in an operator in job /i + 1/.
>     I believe this was generating the waiting scenarios I describe in an
>     earlier email.
>     The bottom line is to be extra careful about when job executions are
>     actually triggered and to make sure that a DataSet which will need
>     to be used in different Flink jobs is available for example as a
>     file in secondary storage (possibly masked as a memory-mapping) and
>     is exclusively read from that source.
>     This means ensuring the job that originally produces a DataSet (for
>     reuse on a later job) assigns to it a DataSink for secondary storage.
> 
>     I'm going to keep digging taking this in account - if will report
>     back if I manage to fix everything or find a new problem.
> 
>     Thanks again,
> 
> 
> 
>     Miguel E. Coimbra
>     Email: miguel.e.coim...@gmail.com <mailto:miguel.e.coim...@ist.utl.pt>
> 
>     On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org
>     <mailto:ches...@apache.org>> wrote:
> 
>         ah yes, currently when you use that method the UI is started on
>         a random port. I'm currently fixing that in this PR
>         <https://github.com/apache/flink/pull/5814> that will be merged
>         today. For now you will enable logging and search for something
>         along the lines of "http://<host>:<port> was granted leadership"
> 
>         Sorry for the inconvenience.
> 
>         On 16.04.2018 15:04, Miguel Coimbra wrote:
>>         Thanks for the suggestions Chesnay, I will try them out.
>>
>>         However, I have already tried your suggestion with the
>>         dependency flink-runtime-web and nothing happened.
>>         If I understood you correctly, adding that dependency in the
>>         pom.xml would make it so the web front-end is running when I
>>         call the following line?
>>
>>         LocalEnvironment lenv = (LocalEnvironment)
>>         ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>
>>         I added flink-runtime-web  in my pom.xml, recompiled and
>>         launched the program but I simply got "Unable to connect" in
>>         my browser (Firefox) on localhost:8081.
>>         Performing wget on localhost:8081 resulted in this:
>>
>>         $ wget localhost:8081
>>         --2018-04-16 12:47:26--  http://localhost:8081/
>>         Resolving localhost (localhost)... ::1, 127.0.0.1
>>         Connecting to localhost (localhost)|::1|:8081... failed:
>>         Connection refused.
>>         Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>         Connection refused.
>>
>>         It seems something was bound to localhost:8081 but the
>>         connection is not working for some reason.
>>         I probably am skipping some important detail.
>>         These are some of my dependencies:
>>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-java</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-core</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>            
>>         <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>            
>>         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>              <groupId>org.apache.flink</groupId>
>>            
>>          
>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>              <version>${flink.version}</version>
>>         </dependency>
>>         <!--
>>         https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web
>>         
>> <https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web>
>>         -->
>>         *<dependency>
>>              <groupId>org.apache.flink</groupId>
>>            
>>          <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>              <version>${flink.version}</version>
>>         </dependency>**
>>         *
>>
>>         Have you managed to get the web front-end in local mode?
>>
>>
>>         Best regards,
>>
>>         Miguel E. Coimbra
>>         Email: miguel.e.coim...@gmail.com
>>         <mailto:miguel.e.coim...@ist.utl.pt>
>>
>>         On 16 April 2018 at 05:12, Chesnay Schepler
>>         <ches...@apache.org <mailto:ches...@apache.org>> wrote:
>>
>>             The thing with createLocalEnvironmentWithWebUI is that it
>>             requires flink-runtime-web to be on the classpath, which
>>             is rarely the class when running things in the IDE.
>>             It should work fine in the IDE if you add it as a
>>             dependency to your project. This should've been logged as
>>             a warning.
>>
>>             Chaining is unrelated to this issue as join operators are
>>             never chained to one another.
>>             Lambda functions are also not the issue, if they were the
>>             job would fail much earlier.
>>
>>             It is reasonable that T3 is blocked if T1 is blocked. T1
>>             gets no input hence produces no output, which now also
>>             blocks T3.
>>
>>             There are multiple possible explanations i can come up with:
>>             * the preceding operators are blocked on something or
>>             /really /slow
>>             * the preceding operators are actually finished, but
>>             aren't shutting down due to an implementation error
>>             * a deadlock in Flink's join logic
>>             * a deadlock in Flink's network stack
>>
>>             For the first 2 we will have to consult the UI or logs.
>>             You said you were dumping the input DataSets into files,
>>             but were they actually complete?
>>
>>             A deadlock in the network stack should appear as all
>>             existing operator threads being blocked.
>>             We can probably rule out a problem with the join logic by
>>             removing the second join and trying again.
>>
>>
>>
>>             On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>             Hello,
>>>
>>>             It would seem that the function which is supposed to
>>>             launch local mode with the web front-end doesn't launch
>>>             the front-end at all...
>>>             This function seems not to be doing what it is supposed
>>>             to do, if I'm not mistaken:
>>>
>>>             LocalEnvironment lenv = (LocalEnvironment)
>>>             ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>
>>>             Regarding the preceding operators, the thread dumps I got
>>>             were pointing to a specific set of operations over
>>>             DataSet instances that were passed into my function.
>>>             Below I show the code segment and put the lines where
>>>             threads are waiting in *bold*:
>>>
>>>             public static <K, VV, EV> DataSet<Edge<K, EV>>
>>>             selectEdges(final Graph<K, VV, EV> originalGraph, final
>>>             DataSet<Vertex<K, VV>> vertices) {
>>>                 return vertices
>>>                         .joinWithHuge(originalGraph.getEdges())
>>>                         .where(0).equalTo(0)
>>>             *            .with((source, edge) -> edge)* *// Thread 1
>>>             is blocked here*
>>>                         .returns(originalGraph.getEdges().getType())
>>>                         .join(vertices)
>>>                         .where(1).equalTo(0)
>>>             *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>                         .returns(originalGraph.getEdges().getType())
>>>                         .distinct(0, 1);
>>>             }
>>>
>>>             Note: the edges inside the graph originalGraph edge
>>>             DataSet are much greater in number than the elements of
>>>             the vertices DataSet, so I believe that function is being
>>>             used correctly.
>>>
>>>             I will try testing with remote (cluster) mode to have
>>>             access to the web front-end, but I have some questions
>>>             for now:
>>>
>>>             - The fact that they are blocked in different ​
>>>             JoinOperator instances that are chained, is this a result
>>>             of Flink's default pipeline mechanism?
>>>             - Could there be a problem stemming from the fact they
>>>             are both waiting on lambdas?
>>>             - I have tried dumping both DataSet variables
>>>             originalGraph and vertices into files (the ones being
>>>             used in this code), and they produced correct values
>>>             (non-empty files), so I don't have a clue what the
>>>             threads inside Flink's runtime are waiting on.
>>>
>>>             ​Thanks for the help so far Chesnay.​
>>>
>>>
>>>             Miguel E. Coimbra
>>>             Email: miguel.e.coim...@gmail.com
>>>             <mailto:miguel.e.coim...@ist.utl.pt>
>>>
>>>             ---------- Forwarded message ----------
>>>
>>>                 From: Chesnay Schepler <ches...@apache.org
>>>                 <mailto:ches...@apache.org>>
>>>                 To: user@flink.apache.org <mailto:user@flink.apache.org>
>>>                 Cc: 
>>>                 Bcc: 
>>>                 Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>                 Subject: Re: Unsure how to further debug - operator
>>>                 threads stuck on java.lang.Thread.State: WAITING
>>>                 Hello,
>>>
>>>                 Thread #1-3 are waiting for input, Thread #4 is
>>>                 waiting for the job to finish.
>>>
>>>                 To further debug this I would look into what the
>>>                 preceding operators are doing, whether they are
>>>                 blocked on something or are emitting records (which
>>>                 you can check in the UI/metrics).
>>>
>>>                 On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>                 ​Hello,
>>>>
>>>>                 I am running into a situation where the Flink
>>>>                 threads responsible for my operator execution are
>>>>                 all stuck on WAITING mode.
>>>>                 Before anything else, this is my machine's spec:
>>>>
>>>>                 Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7-
>>>>                 4830  @ 2.13GHz GenuineIntel GNU/Linux
>>>>                 256 GB RAM
>>>>
>>>>                 I am running in local mode on a machine with a
>>>>                 considerable amount of memory, so perhaps that may
>>>>                 be triggering some execution edge-case?
>>>>
>>>>                 Moving on, this is my Java:
>>>>
>>>>                 openjdk version "1.8.0_151"
>>>>                 OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>>                 OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>
>>>>                 Getting back to the problem: I am currently using
>>>>                 Flink 1.5-SNAPSHOT with LocalEnvironment on this
>>>>                 large-memory machine, with parallelism set to one:
>>>>
>>>>                 Configuration conf = new Configuration();
>>>>                 LocalEnvironment lenv = (LocalEnvironment)
>>>>                 ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>                 ExecutionEnvironment env = lenv;
>>>>                 
>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
>>>>                 env.setParallelism(1);
>>>>
>>>>                 This initializes the execution environment for a
>>>>                 series of sequential jobs (any data dependency
>>>>                 between jobs is flushed to disk on job /i /and read
>>>>                 back from disk into a DataSet in job /i + 1/).
>>>>                 To reiterate, I am not launching a Flink cluster, I
>>>>                 am just executing in local mode from a code base
>>>>                 compiled with Maven.
>>>>
>>>>                 I have tested this program via mvn exec:exec with
>>>>                 different values of memory (from -Xmx20000m to
>>>>                 -Xmx120000m, from 20GB to 120GB) and the result is
>>>>                 always the same: the process' memory fills up
>>>>                 completely and then the process' CPU usage drops to 0%.
>>>>                 This is strange because if it was lack of memory, I
>>>>                 would expect an OutOfMemoryError.
>>>>
>>>>                 I have debugged with IntelliJ IDEA and obtained
>>>>                 thread dumps from different executions, and realized
>>>>                 quite a few operator threads are stuck on
>>>>                 java.lang.Thread.State: WAITING.
>>>>
>>>>                 There are four major threads that I find to be in
>>>>                 this waiting state.
>>>>                 The thread dumps I obtained show me where the wait
>>>>                 calls originated:
>>>>
>>>>                 *Number 1:
>>>>
>>>>                 *"CHAIN Join (Join at
>>>>                 selectEdges(GraphUtils.java:328)) -> Combine
>>>>                 (Distinct at selectEdges(GraphUtils.java:330))
>>>>                 (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 
>>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>                 *Number 2:*
>>>>
>>>>                 "Join (Join at
>>>>                 summaryGraph(SummaryGraphBuilder.java:92))
>>>>                 (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 
>>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 *Number 3:*
>>>>
>>>>                 "Join (Join at selectEdges(GraphUtils.java:324))
>>>>                 (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at java.lang.Object.wait(Object.java:-1)
>>>>                       at java.lang.Object.wait(Object.java:502)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>                       at org.apache.flink.runtime.io
>>>>                 
>>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>                 
>>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 *Number 4:*
>>>>
>>>>                 "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA
>>>>                 waiting
>>>>                   java.lang.Thread.State: WAITING
>>>>                       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>                       at
>>>>                 
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>                       at
>>>>                 
>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>>>>                       at
>>>>                 
>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>>>                       at
>>>>                 
>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>>>>                       at
>>>>                 
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>>                       at
>>>>                 
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
>>>>                       at
>>>>                 
>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
>>>>                       - locked <0x23eb> (a java.lang.Object)
>>>>                       at
>>>>                 
>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>>>>                       at
>>>>                 
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
>>>>                       at
>>>>                 org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>                       at
>>>>                 
>>>> my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
>>>>                       at my.package.algorithm.Sample.co
>>>>                 
>>>> <http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492)
>>>>                       at my.package.algorithm.Sample.ru
>>>>                 <http://my.package.algorithm.Sample.ru>n(Sample.java:291).
>>>>                       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>                 While I realize these dumps on their own may not be
>>>>                 helpful, they at least (as far as I know) indicate
>>>>                 that the threads are all waiting on something.
>>>>                 But if it was resource scarcity I believe the
>>>>                 program would terminate with an exception.
>>>>                 And if it was garbage collection activity, I believe
>>>>                 the JVM process would not be at 0% CPU usage.
>>>>
>>>>                 *Note: *I realize I didn't provide the user-code
>>>>                 code that generates the execution plan for Flink
>>>>                 which led to the contexts in which the threads are
>>>>                 waiting, but I hope it may not be necessary.
>>>>                 My problem now is that I am unsure on how to proceed
>>>>                 to further debug this issue:
>>>>                 - The assigned memory is fully used, but there are
>>>>                 no exceptions about lack of memory.
>>>>                 - The CPU usage is at 0% and all threads are all in
>>>>                 a waiting state, but I don't understand what signal
>>>>                 they're waiting for exactly.
>>>>
>>>>                 Hoping anyone might be able to give me a hint.
>>>>
>>>>                 Thank you very much for your time.
>>>>
>>>>                 Best regards,
>>>>
>>>>                 Miguel E. Coimbra
>>>
>>
>>
> 
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to