Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-24 Thread Nico Kruber
;>     DataSet are much greater in number than the elements of
>>> the vertices DataSet, so I believe that function is being
>>> used correctly.
>>>
>>> I will try testing with remote (cluster) mode to have
>>> access to the web front-end, but I have some questions
>>> for now:
>>>
>>> - The fact that they are blocked in different ​
>>> JoinOperator instances that are chained, is this a result
>>> of Flink's default pipeline mechanism?
>>> - Could there be a problem stemming from the fact they
>>> are both waiting on lambdas?
>>> - I have tried dumping both DataSet variables
>>> originalGraph and vertices into files (the ones being
>>> used in this code), and they produced correct values
>>> (non-empty files), so I don't have a clue what the
>>> threads inside Flink's runtime are waiting on.
>>>
>>> ​Thanks for the help so far Chesnay.​
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com
>>> <mailto:miguel.e.coim...@ist.utl.pt>
>>>
>>> -- Forwarded message --
>>>
>>> From: Chesnay Schepler <ches...@apache.org
>>> <mailto:ches...@apache.org>>
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>> Cc: 
>>> Bcc: 
>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>> Subject: Re: Unsure how to further debug - operator
>>> threads stuck on java.lang.Thread.State: WAITING
>>> Hello,
>>>
>>> Thread #1-3 are waiting for input, Thread #4 is
>>> waiting for the job to finish.
>>>
>>> To further debug this I would look into what the
>>> preceding operators are doing, whether they are
>>> blocked on something or are emitting records (which
>>> you can check in the UI/metrics).
>>>
>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>> ​Hello,
>>>>
>>>> I am running into a situation where the Flink
>>>> threads responsible for my operator execution are
>>>> all stuck on WAITING mode.
>>>> Before anything else, this is my machine's spec:
>>>>
>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7-
>>>> 4830  @ 2.13GHz GenuineIntel GNU/Linux
>>>> 256 GB RAM
>>>>
>>>> I am running in local mode on a machine with a
>>>> considerable amount of memory, so perhaps that may
>>>> be triggering some execution edge-case?
>>>>
>>>> Moving on, this is my Java:
>>>>
>>>> openjdk version "1.8.0_151"
>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>
>>>> Getting back to the problem: I am currently using
>>>> Flink 1.5-SNAPSHOT with LocalEnvironment on this
>>>> large-memory machine, with parallelism set to one:
>>>>
>>>> Configuration conf = new Configuration();
>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>> ExecutionEnvironment env = lenv;
>>>> 
>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
>>>> env.setParallelism(1);
>>>>
>>>> This initializes the execution environment for a
>>>> series of sequential jobs (any data dependency
>>>> between jobs is flushed to disk on job /i /and read
>>>> back from disk into a DataSet in job /i + 1/).
>>>> To reiterate, I am not launching a Flin

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-23 Thread Fabian Hueske
gt;>>>>
>>>>>> There are multiple possible explanations i can come up with:
>>>>>> * the preceding operators are blocked on something or *really *slow
>>>>>> * the preceding operators are actually finished, but aren't shutting
>>>>>> down due to an implementation error
>>>>>> * a deadlock in Flink's join logic
>>>>>> * a deadlock in Flink's network stack
>>>>>>
>>>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>>>> were dumping the input DataSets into files, but were they actually 
>>>>>> complete?
>>>>>>
>>>>>> A deadlock in the network stack should appear as all existing
>>>>>> operator threads being blocked.
>>>>>> We can probably rule out a problem with the join logic by removing
>>>>>> the second join and trying again.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> It would seem that the function which is supposed to launch local
>>>>>> mode with the web front-end doesn't launch the front-end at all...
>>>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>>>> not mistaken:
>>>>>>
>>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>>
>>>>>> Regarding the preceding operators, the thread dumps I got were
>>>>>> pointing to a specific set of operations over DataSet instances that
>>>>>> were passed into my function.
>>>>>> Below I show the code segment and put the lines where threads are
>>>>>> waiting in *bold*:
>>>>>>
>>>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>>>> return vertices
>>>>>> .joinWithHuge(originalGraph.getEdges())
>>>>>> .where(0).equalTo(0)
>>>>>> *.with((source, edge) -> edge)* *// Thread 1 is blocked
>>>>>> here*
>>>>>> .returns(originalGraph.getEdges().getType())
>>>>>> .join(vertices)
>>>>>> .where(1).equalTo(0)
>>>>>> *.with((e, v) -> e) // Thread 3 is blocked here*
>>>>>> .returns(originalGraph.getEdges().getType())
>>>>>> .distinct(0, 1);
>>>>>> }
>>>>>>
>>>>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>>>>> greater in number than the elements of the vertices DataSet, so I
>>>>>> believe that function is being used correctly.
>>>>>>
>>>>>> I will try testing with remote (cluster) mode to have access to the
>>>>>> web front-end, but I have some questions for now:
>>>>>>
>>>>>> - The fact that they are blocked in different ​JoinOperator instances
>>>>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>>>>> - Could there be a problem stemming from the fact they are both
>>>>>> waiting on lambdas?
>>>>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>>>>> into files (the ones being used in this code), and they produced
>>>>>> correct values (non-empty files), so I don't have a clue what the threads
>>>>>> inside Flink's runtime are waiting on.
>>>>>>
>>>>>> ​Thanks for the help so far Chesnay.​
>>>>>>
>>>>>>
>>>>>> Miguel E. Coimbra
>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>>
>>>>>> -- Forwarded message --
>>>>>>
>>>>>>> From: Chesnay Schepler <ches...@apache.org>
>>>>>>> To: user@flink.apache.org
>>>>>>> Cc:
>>>>>>> Bcc:
>>>>>>> Date: Sun, 15 Apr 2018 18:54:33 +02

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-21 Thread Miguel Coimbra
/
>>>> Resolving localhost (localhost)... ::1, 127.0.0.1
>>>> Connecting to localhost (localhost)|::1|:8081... failed: Connection
>>>> refused.
>>>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>>> Connection refused.
>>>>
>>>> It seems something was bound to localhost:8081 but the connection is
>>>> not working for some reason.
>>>> I probably am skipping some important detail.
>>>> These are some of my dependencies:
>>>>
>>>> 
>>>> org.apache.flink
>>>> flink-java
>>>> ${flink.version}
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-core
>>>> ${flink.version}
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-clients_${scala.binary.version}
>>>> ${flink.version}
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-gelly_${scala.binary.version}
>>>> ${flink.version}
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-gelly-examples_${scala.binary.version}>>> tifactId>
>>>> ${flink.version}
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-streaming-java_${scala.binary.version}>>> tifactId>
>>>> ${flink.version}
>>>> 
>>>> 
>>>>  org.apache.flink
>>>>  flink-streaming-scala_${scala.binary.version}>>> artifactId>
>>>>  ${flink.version}
>>>> 
>>>> 
>>>>
>>>>
>>>>
>>>>
>>>> *  org.apache.flink
>>>>  flink-runtime-web_${scala.binary.version}
>>>>  ${flink.version} *
>>>>
>>>> Have you managed to get the web front-end in local mode?
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>
>>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:
>>>>
>>>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>>>> flink-runtime-web to be on the classpath, which is rarely the class
>>>>> when running things in the IDE.
>>>>> It should work fine in the IDE if you add it as a dependency to your
>>>>> project. This should've been logged as a warning.
>>>>>
>>>>> Chaining is unrelated to this issue as join operators are never
>>>>> chained to one another.
>>>>> Lambda functions are also not the issue, if they were the job would
>>>>> fail much earlier.
>>>>>
>>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
>>>>> hence produces no output, which now also blocks T3.
>>>>>
>>>>> There are multiple possible explanations i can come up with:
>>>>> * the preceding operators are blocked on something or *really *slow
>>>>> * the preceding operators are actually finished, but aren't shutting
>>>>> down due to an implementation error
>>>>> * a deadlock in Flink's join logic
>>>>> * a deadlock in Flink's network stack
>>>>>
>>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>>> were dumping the input DataSets into files, but were they actually 
>>>>> complete?
>>>>>
>>>>> A deadlock in the network stack should appear as all existing operator
>>>>> threads being blocked.
>>>>> We can probably rule out a problem with the join logic by removing the
>>>>> second join and trying again.
>>>>>
>>>>>
>>>>>
>>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> It would seem that the function which is supposed to launch local mode
>>>>> with the web front-end doesn't launch the front-end at all...
>>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>>> not mistaken:
>>>>>
>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>
>

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
*  org.apache.flink
>>>  flink-runtime-web_${scala.binary.version}
>>>  ${flink.version} *
>>>
>>> Have you managed to get the web front-end in local mode?
>>>
>>>
>>> Best regards,
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>
>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:
>>>
>>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>>> flink-runtime-web to be on the classpath, which is rarely the class
>>>> when running things in the IDE.
>>>> It should work fine in the IDE if you add it as a dependency to your
>>>> project. This should've been logged as a warning.
>>>>
>>>> Chaining is unrelated to this issue as join operators are never chained
>>>> to one another.
>>>> Lambda functions are also not the issue, if they were the job would
>>>> fail much earlier.
>>>>
>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
>>>> hence produces no output, which now also blocks T3.
>>>>
>>>> There are multiple possible explanations i can come up with:
>>>> * the preceding operators are blocked on something or *really *slow
>>>> * the preceding operators are actually finished, but aren't shutting
>>>> down due to an implementation error
>>>> * a deadlock in Flink's join logic
>>>> * a deadlock in Flink's network stack
>>>>
>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>> were dumping the input DataSets into files, but were they actually 
>>>> complete?
>>>>
>>>> A deadlock in the network stack should appear as all existing operator
>>>> threads being blocked.
>>>> We can probably rule out a problem with the join logic by removing the
>>>> second join and trying again.
>>>>
>>>>
>>>>
>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>
>>>> Hello,
>>>>
>>>> It would seem that the function which is supposed to launch local mode
>>>> with the web front-end doesn't launch the front-end at all...
>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>> not mistaken:
>>>>
>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>
>>>> Regarding the preceding operators, the thread dumps I got were pointing
>>>> to a specific set of operations over DataSet instances that were
>>>> passed into my function.
>>>> Below I show the code segment and put the lines where threads are
>>>> waiting in *bold*:
>>>>
>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>> return vertices
>>>> .joinWithHuge(originalGraph.getEdges())
>>>> .where(0).equalTo(0)
>>>> *.with((source, edge) -> edge)* *// Thread 1 is blocked
>>>> here*
>>>> .returns(originalGraph.getEdges().getType())
>>>> .join(vertices)
>>>> .where(1).equalTo(0)
>>>> *.with((e, v) -> e) // Thread 3 is blocked here*
>>>> .returns(originalGraph.getEdges().getType())
>>>> .distinct(0, 1);
>>>> }
>>>>
>>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>>> greater in number than the elements of the vertices DataSet, so I
>>>> believe that function is being used correctly.
>>>>
>>>> I will try testing with remote (cluster) mode to have access to the web
>>>> front-end, but I have some questions for now:
>>>>
>>>> - The fact that they are blocked in different ​JoinOperator instances
>>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>>> - Could there be a problem stemming from the fact they are both waiting
>>>> on lambdas?
>>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>>> into files (the ones being used in this code), and they produced
>>>> correct values

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread James Yu
omething or *really *slow
>>> * the preceding operators are actually finished, but aren't shutting
>>> down due to an implementation error
>>> * a deadlock in Flink's join logic
>>> * a deadlock in Flink's network stack
>>>
>>> For the first 2 we will have to consult the UI or logs. You said you
>>> were dumping the input DataSets into files, but were they actually complete?
>>>
>>> A deadlock in the network stack should appear as all existing operator
>>> threads being blocked.
>>> We can probably rule out a problem with the join logic by removing the
>>> second join and trying again.
>>>
>>>
>>>
>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>
>>> Hello,
>>>
>>> It would seem that the function which is supposed to launch local mode
>>> with the web front-end doesn't launch the front-end at all...
>>> This function seems not to be doing what it is supposed to do, if I'm
>>> not mistaken:
>>>
>>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>>> alEnvironmentWithWebUI(conf);
>>>
>>> Regarding the preceding operators, the thread dumps I got were pointing
>>> to a specific set of operations over DataSet instances that were passed
>>> into my function.
>>> Below I show the code segment and put the lines where threads are
>>> waiting in *bold*:
>>>
>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>> return vertices
>>> .joinWithHuge(originalGraph.getEdges())
>>> .where(0).equalTo(0)
>>> *.with((source, edge) -> edge)* *// Thread 1 is blocked
>>> here*
>>> .returns(originalGraph.getEdges().getType())
>>> .join(vertices)
>>>     .where(1).equalTo(0)
>>> *.with((e, v) -> e) // Thread 3 is blocked here*
>>> .returns(originalGraph.getEdges().getType())
>>> .distinct(0, 1);
>>> }
>>>
>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>> greater in number than the elements of the vertices DataSet, so I
>>> believe that function is being used correctly.
>>>
>>> I will try testing with remote (cluster) mode to have access to the web
>>> front-end, but I have some questions for now:
>>>
>>> - The fact that they are blocked in different ​JoinOperator instances
>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>> - Could there be a problem stemming from the fact they are both waiting
>>> on lambdas?
>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>> into files (the ones being used in this code), and they produced correct
>>> values (non-empty files), so I don't have a clue what the threads inside
>>> Flink's runtime are waiting on.
>>>
>>> ​Thanks for the help so far Chesnay.​
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>
>>> -- Forwarded message --
>>>
>>>> From: Chesnay Schepler <ches...@apache.org>
>>>> To: user@flink.apache.org
>>>> Cc:
>>>> Bcc:
>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>>> java.lang.Thread.State: WAITING
>>>> Hello,
>>>>
>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>>>> finish.
>>>>
>>>> To further debug this I would look into what the preceding operators
>>>> are doing, whether they are blocked on something or are emitting records
>>>> (which you can check in the UI/metrics).
>>>>
>>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>
>>>> ​Hello,
>>>>
>>>> I am running into a situation where the Flink threads responsible for
>>>> my operator execution are all stuck on WAITING mode.
>>>> Before anything else, this is my machine's spec:
>>>>
>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>>> GenuineIntel GNU/Linux
>>>> 256 GB RAM
>>>>
>>>> I am running in local mode on

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
Huge(originalGraph.getEdges())
>> .where(0).equalTo(0)
>> *.with((source, edge) -> edge)* *// Thread 1 is blocked here*
>> .returns(originalGraph.getEdges().getType())
>> .join(vertices)
>> .where(1).equalTo(0)
>> *.with((e, v) -> e) // Thread 3 is blocked here*
>> .returns(originalGraph.getEdges().getType())
>> .distinct(0, 1);
>> }
>>
>> Note: the edges inside the graph originalGraph edge DataSet are much
>> greater in number than the elements of the vertices DataSet, so I
>> believe that function is being used correctly.
>>
>> I will try testing with remote (cluster) mode to have access to the web
>> front-end, but I have some questions for now:
>>
>> - The fact that they are blocked in different ​JoinOperator instances
>> that are chained, is this a result of Flink's default pipeline mechanism?
>> - Could there be a problem stemming from the fact they are both waiting
>> on lambdas?
>> - I have tried dumping both DataSet variables originalGraph and vertices into
>> files (the ones being used in this code), and they produced correct values
>> (non-empty files), so I don't have a clue what the threads inside Flink's
>> runtime are waiting on.
>>
>> ​Thanks for the help so far Chesnay.​
>>
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>
>> -- Forwarded message --
>>
>>> From: Chesnay Schepler <ches...@apache.org>
>>> To: user@flink.apache.org
>>> Cc:
>>> Bcc:
>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>> java.lang.Thread.State: WAITING
>>> Hello,
>>>
>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>>> finish.
>>>
>>> To further debug this I would look into what the preceding operators are
>>> doing, whether they are blocked on something or are emitting records (which
>>> you can check in the UI/metrics).
>>>
>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>
>>> ​Hello,
>>>
>>> I am running into a situation where the Flink threads responsible for my
>>> operator execution are all stuck on WAITING mode.
>>> Before anything else, this is my machine's spec:
>>>
>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>> GenuineIntel GNU/Linux
>>> 256 GB RAM
>>>
>>> I am running in local mode on a machine with a considerable amount of
>>> memory, so perhaps that may be triggering some execution edge-case?
>>>
>>> Moving on, this is my Java:
>>>
>>> openjdk version "1.8.0_151"
>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>
>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
>>> with LocalEnvironment on this large-memory machine, with parallelism
>>> set to one:
>>>
>>> Configuration conf = new Configuration();
>>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>>> alEnvironmentWithWebUI(conf);
>>> ExecutionEnvironment env = lenv;
>>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>>> .enableObjectReuse();
>>> env.setParallelism(1);
>>>
>>> This initializes the execution environment for a series of sequential
>>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>>> read back from disk into a DataSet in job *i + 1*).
>>> To reiterate, I am not launching a Flink cluster, I am just executing in
>>> local mode from a code base compiled with Maven.
>>>
>>> I have tested this program via mvn exec:exec with different values of
>>> memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the
>>> result is always the same: the process' memory fills up completely and then
>>> the process' CPU usage drops to 0%.
>>> This is strange because if it was lack of memory, I would expect an
>>> OutOfMemoryError.
>>>
>>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>>> different executions, and realized quite a few operator threads are stuck
>>> on java.lang.Thread.State: WAITING.
>>>
>>> There are four major threads that I find to be in this waiting st

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-16 Thread Chesnay Schepler
 -> e) // Thread 3 is blocked here*
.returns(originalGraph.getEdges().getType())
.distinct(0, 1);
}

Note: the edges inside the graph originalGraph edge DataSet are
much greater in number than the elements of the vertices DataSet,
so I believe that function is being used correctly.

I will try testing with remote (cluster) mode to have access to
the web front-end, but I have some questions for now:

- The fact that they are blocked in different ​JoinOperator
instances that are chained, is this a result of Flink's default
pipeline mechanism?
- Could there be a problem stemming from the fact they are both
waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and
vertices into files (the ones being used in this code), and they
produced correct values (non-empty files), so I don't have a clue
what the threads inside Flink's runtime are waiting on.

​Thanks for the help so far Chesnay.​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com
<mailto:miguel.e.coim...@ist.utl.pt>

-- Forwarded message --

From: Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>>
To: user@flink.apache.org <mailto:user@flink.apache.org>
    Cc:
Bcc:
    Date: Sun, 15 Apr 2018 18:54:33 +0200
    Subject: Re: Unsure how to further debug - operator threads
stuck on java.lang.Thread.State: WAITING
Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for
the job to finish.

To further debug this I would look into what the preceding
operators are doing, whether they are blocked on something or
are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:

​Hello,

I am running into a situation where the Flink threads
responsible for my operator execution are all stuck on
WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @
2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable
amount of memory, so perhaps that may be triggering some
execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink
1.5-SNAPSHOT with LocalEnvironment on this large-memory
machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;

env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of
sequential jobs (any data dependency between jobs is flushed
to disk on job /i /and read back from disk into a DataSet in
job /i + 1/).
To reiterate, I am not launching a Flink cluster, I am just
executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different
values of memory (from -Xmx2m to -Xmx12m, from 20GB
to 120GB) and the result is always the same: the process'
memory fills up completely and then the process' CPU usage
drops to 0%.
This is strange because if it was lack of memory, I would
expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps
from different executions, and realized quite a few operator
threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this
waiting state.
The thread dumps I obtained show me where the wait calls
originated:

*Number 1:

*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330))
(1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io

<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io

<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleI

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-16 Thread Miguel Coimbra
>
> I will try testing with remote (cluster) mode to have access to the web
> front-end, but I have some questions for now:
>
> - The fact that they are blocked in different ​JoinOperator instances
> that are chained, is this a result of Flink's default pipeline mechanism?
> - Could there be a problem stemming from the fact they are both waiting on
> lambdas?
> - I have tried dumping both DataSet variables originalGraph and vertices into
> files (the ones being used in this code), and they produced correct values
> (non-empty files), so I don't have a clue what the threads inside Flink's
> runtime are waiting on.
>
> ​Thanks for the help so far Chesnay.​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> -- Forwarded message --
>
>> From: Chesnay Schepler <ches...@apache.org>
>> To: user@flink.apache.org
>> Cc:
>> Bcc:
>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>> Subject: Re: Unsure how to further debug - operator threads stuck on
>> java.lang.Thread.State: WAITING
>> Hello,
>>
>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>> finish.
>>
>> To further debug this I would look into what the preceding operators are
>> doing, whether they are blocked on something or are emitting records (which
>> you can check in the UI/metrics).
>>
>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>
>> ​Hello,
>>
>> I am running into a situation where the Flink threads responsible for my
>> operator execution are all stuck on WAITING mode.
>> Before anything else, this is my machine's spec:
>>
>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>> GenuineIntel GNU/Linux
>> 256 GB RAM
>>
>> I am running in local mode on a machine with a considerable amount of
>> memory, so perhaps that may be triggering some execution edge-case?
>>
>> Moving on, this is my Java:
>>
>> openjdk version "1.8.0_151"
>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>
>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with 
>> LocalEnvironment
>> on this large-memory machine, with parallelism set to one:
>>
>> Configuration conf = new Configuration();
>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>> alEnvironmentWithWebUI(conf);
>> ExecutionEnvironment env = lenv;
>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>> .enableObjectReuse();
>> env.setParallelism(1);
>>
>> This initializes the execution environment for a series of sequential
>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>> read back from disk into a DataSet in job *i + 1*).
>> To reiterate, I am not launching a Flink cluster, I am just executing in
>> local mode from a code base compiled with Maven.
>>
>> I have tested this program via mvn exec:exec with different values of
>> memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the
>> result is always the same: the process' memory fills up completely and then
>> the process' CPU usage drops to 0%.
>> This is strange because if it was lack of memory, I would expect an
>> OutOfMemoryError.
>>
>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>> different executions, and realized quite a few operator threads are stuck
>> on java.lang.Thread.State: WAITING.
>>
>> There are four major threads that I find to be in this waiting state.
>> The thread dumps I obtained show me where the wait calls originated:
>>
>>
>>
>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
>> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158"
>> prio=5 tid=0xd93 nid=NA waiting
>>   java.lang.Thread.State: WAITING
>>   at java.lang.Object.wait(Object.java:-1)
>>   at java.lang.Object.wait(Object.java:502)
>>   at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>   at org.apache.flink.runtime.io.network.partition.consumer.Singl
>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>   at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>   at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:47)
>>   at org.apache.fl

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-15 Thread Chesnay Schepler

Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to 
finish.


To further debug this I would look into what the preceding operators are 
doing, whether they are blocked on something or are emitting records 
(which you can check in the UI/metrics).


On 15.04.2018 18:40, Miguel Coimbra wrote:

​Hello,

I am running into a situation where the Flink threads responsible for 
my operator execution are all stuck on WAITING mode.

Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz 
GenuineIntel GNU/Linux

256 GB RAM

I am running in local mode on a machine with a considerable amount of 
memory, so perhaps that may be triggering some execution edge-case?


Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT 
with LocalEnvironment on this large-memory machine, with parallelism 
set to one:


Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) 
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential 
jobs (any data dependency between jobs is flushed to disk on job /i 
/and read back from disk into a DataSet in job /i + 1/).
To reiterate, I am not launching a Flink cluster, I am just executing 
in local mode from a code base compiled with Maven.


I have tested this program via mvn exec:exec with different values of 
memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the 
result is always the same: the process' memory fills up completely and 
then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an 
OutOfMemoryError.


I have debugged with IntelliJ IDEA and obtained thread dumps from 
different executions, and realized quite a few operator threads are 
stuck on java.lang.Thread.State: WAITING.


There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

*Number 1:

*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine 
(Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 
tid=0xd93 nid=NA waiting

  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io 
.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io 
.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
  at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
  at 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
  at 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
  at 
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
  at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
  at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" 
prio=5 tid=0xd8e nid=NA waiting

  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io 

Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-15 Thread Miguel Coimbra
​Hello,

I am running into a situation where the Flink threads responsible for my
operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of
memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
with LocalEnvironment
on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().
enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs
(any data dependency between jobs is flushed to disk on job *i *and read
back from disk into a DataSet in job *i + 1*).
To reiterate, I am not launching a Flink cluster, I am just executing in
local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of
memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the result
is always the same: the process' memory fills up completely and then the
process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an
OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different
executions, and realized quite a few operator threads are stuck on
java.lang.Thread.State:
WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:



*Number 1:*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5
tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
  at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
  at org.apache.flink.runtime.operators.hash.
ReusingBuildSecondHashJoinIterator.callWithNextKey(
ReusingBuildSecondHashJoinIterator.java:122)
  at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
  at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
  at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.