Re: REST: reading completed jobs' details

2018-09-06 Thread Miguel Coimbra
Exactly, that was the problem.
Didn't realize the restructured cluster channels all communications to the
REST port.

Thanks again.

Best,

On Thu, 6 Sep 2018 at 17:57, Chesnay Schepler  wrote:

> Did you by chance use the RemoteEnvironment and pass in 6123 as the port?
> If so, try using 8081 instead, which is the REST port.
>
> On 06.09.2018 18:24, Miguel Coimbra wrote:
>
> Hello Chesnay,
>
> Thanks for the information.
>
> Decided to move straight away to launching a standalone cluster.
> I'm now having another problem when trying to submit a job through my Java
> program after launching the standalone cluster.
>
> I configured the cluster (flink-1.6.0/conf/flink-conf.yaml) to use 2
> TaskManager instances and assigned port ranges for most Flink cluster
> entities (to avoid port collisions with more than 1 TaskManager):
>
> query.server.ports: 3-35000
> query.proxy.ports: 35001-4
> taskmanager.rpc.port: 45001-5
> taskmanager.data.port: 50001-55000
> blob.server.port: 55001-6
>
> I'm launching in Linux with:
>
> ./start-cluster.sh
>
> Starting cluster.
> Starting standalonesession daemon on host xxx.
> Starting taskexecutor daemon on host xxx.
> [INFO] 1 instance(s) of taskexecutor are already running on xxx.
> Starting taskexecutor daemon on host xxx.
>
>
> However, my Java program ends up hanging as soon as I perform an execute()
> call (for example by calling count() on a DataSet).
>
> Checking the JobManager log, I find the following exception whenever my
> Java program calls execute() over the ExecutionEnvironment (either using
> Maven on the terminal or from IntelliJ IDEA):
>
> WARN  akka.remote.transport.netty.NettyTransport-
> Remote connection to [/127.0.0.1:47774] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
> Adjusted frame length exceeds 10485760: 1347375960 - discarded
>
> I checked that the problem is happening on a count(), so I don't think it
> has to do with the JobManager/TaskManagers trying to exchange
> excessively-big messages.
>
> While searching, I tried to make sure my program compiles with the same
> library versions as those in this cluster version of Flink.
>
>
> I downloaded the Apache Flink 1.6 binaries to launch the cluster:
>
>
>
> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.0/flink-1.6.0-bin-scala_2.11.tgz
>
>
> I then checked the library versions used in the pom.xml of the 1.6.0
> branch of the Flink repository:
>
>
> https://github.com/apache/flink/blob/release-1.6/pom.xml
>
> On my project's pom.xml, I have the following:
>
> 
>UTF-8
>UTF-8
>1.8
>1.8
>1.6.0   1.7.7
>1.2.17
>2.11.12
>2.11
>2.4.20
>4.12
>5.0.0
>${junit.version}.1
>1.0.1
>1.9.1
>
>
> My project's dependency versions match those of the Flink 1.6 repository
> (for libraries such as akka).
> However, I'm having difficulty understanding what else may be causing this
> problem.
>
> Thanks for your attention.
>
> Best,
>
> On Wed, 5 Sep 2018 at 20:18, Chesnay Schepler  wrote:
>
>> No, the cluster isn't shared. For each job a separate cluster is spun up
>> when calling execute(), at the end of which it is shut down.
>>
>> For explicitly creation and shutdown of a cluster I would suggest to
>> execute your jobs as a test that contains a MiniClusterResource.
>>
>> On 05.09.2018 20:59, Miguel Coimbra wrote:
>>
>> Thanks for the reply.
>>
>> However, I think my case differs because I am running a sequence of
>> independent Flink jobs on the same environment instance.
>> I only create the LocalExecutionEnvironment once.
>>
>> The web manager shows the job ID changing correctly every time a new job
>> is executed.
>>
>> Since it is the same execution environment (and therefore the same
>> cluster instance I imagine), those completed jobs should show as well, no?
>>
>> On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler  wrote:
>>
>>> When you create an environment that way, then the cluster is shutdown
>>> once the job completes.
>>> The WebUI can _appear_ as still working since all the files, and data
>>> about the job, is cached in the browser.
>>>
>>> On 05.09.2018 17:39, Miguel Coimbra wrote:
>>>
>>> Hello,
>>>
>>> I'm having difficulty reading the status (such as time taken for each
>>> dataflow operator in a job) of jobs that have completed.
>>>
>>> First, when I click on "Completed jobs" on the web i

Re: REST: reading completed jobs' details

2018-09-06 Thread Miguel Coimbra
Hello Chesnay,

Thanks for the information.

Decided to move straight away to launching a standalone cluster.
I'm now having another problem when trying to submit a job through my Java
program after launching the standalone cluster.

I configured the cluster (flink-1.6.0/conf/flink-conf.yaml) to use 2
TaskManager instances and assigned port ranges for most Flink cluster
entities (to avoid port collisions with more than 1 TaskManager):

query.server.ports: 3-35000
query.proxy.ports: 35001-4
taskmanager.rpc.port: 45001-5
taskmanager.data.port: 50001-55000
blob.server.port: 55001-6

I'm launching in Linux with:

./start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host xxx.
Starting taskexecutor daemon on host xxx.
[INFO] 1 instance(s) of taskexecutor are already running on xxx.
Starting taskexecutor daemon on host xxx.


However, my Java program ends up hanging as soon as I perform an execute()
call (for example by calling count() on a DataSet).

Checking the JobManager log, I find the following exception whenever my
Java program calls execute() over the ExecutionEnvironment (either using
Maven on the terminal or from IntelliJ IDEA):

WARN  akka.remote.transport.netty.NettyTransport-
Remote connection to [/127.0.0.1:47774] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1347375960 - discarded

I checked that the problem is happening on a count(), so I don't think it
has to do with the JobManager/TaskManagers trying to exchange
excessively-big messages.

While searching, I tried to make sure my program compiles with the same
library versions as those in this cluster version of Flink.


I downloaded the Apache Flink 1.6 binaries to launch the cluster:


https://www.apache.org/dyn/closer.lua/flink/flink-1.6.0/flink-1.6.0-bin-scala_2.11.tgz


I then checked the library versions used in the pom.xml of the 1.6.0 branch
of the Flink repository:


https://github.com/apache/flink/blob/release-1.6/pom.xml

On my project's pom.xml, I have the following:


   UTF-8
   UTF-8
   1.8
   1.8
   1.6.0
   1.7.7
   1.2.17
   2.11.12
   2.11
   2.4.20
   4.12
   5.0.0
   ${junit.version}.1
   1.0.1
   1.9.1



My project's dependency versions match those of the Flink 1.6 repository
(for libraries such as akka).
However, I'm having difficulty understanding what else may be causing this
problem.

Thanks for your attention.

Best,

On Wed, 5 Sep 2018 at 20:18, Chesnay Schepler  wrote:

> No, the cluster isn't shared. For each job a separate cluster is spun up
> when calling execute(), at the end of which it is shut down.
>
> For explicitly creation and shutdown of a cluster I would suggest to
> execute your jobs as a test that contains a MiniClusterResource.
>
> On 05.09.2018 20:59, Miguel Coimbra wrote:
>
> Thanks for the reply.
>
> However, I think my case differs because I am running a sequence of
> independent Flink jobs on the same environment instance.
> I only create the LocalExecutionEnvironment once.
>
> The web manager shows the job ID changing correctly every time a new job
> is executed.
>
> Since it is the same execution environment (and therefore the same cluster
> instance I imagine), those completed jobs should show as well, no?
>
> On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler  wrote:
>
>> When you create an environment that way, then the cluster is shutdown
>> once the job completes.
>> The WebUI can _appear_ as still working since all the files, and data
>> about the job, is cached in the browser.
>>
>> On 05.09.2018 17:39, Miguel Coimbra wrote:
>>
>> Hello,
>>
>> I'm having difficulty reading the status (such as time taken for each
>> dataflow operator in a job) of jobs that have completed.
>>
>> First, when I click on "Completed jobs" on the web interface (by default
>> at 8081), no job shows up.
>> I see jobs that exist as "Running", but as soon as they finish, I would
>> expect them to appear in the "Complete jobs" section, but no luck.
>>
>> Consider that I am running locally (web UI is running, I checked and it
>> is available via browser) on 8081.
>> None of these links worked for checking jobs that have already finished,
>> such as the job ID 618fac9da6ea458f5091a9c40e54cbcc that had been running:
>>
>> http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
>> http://127.0.0.1:8081/completed-jobs/618fac9da6ea458f5091a9c40e54cbcc
>>
>> I'm running with a LocalExecutionEnvironment with with the method:
>>
>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>>
>> I hope anyone may be able to help.
>>
>> Best,
>>
>>
>>
>>
>


Re: REST: reading completed jobs' details

2018-09-05 Thread Miguel Coimbra
Thanks for the reply.

However, I think my case differs because I am running a sequence of
independent Flink jobs on the same environment instance.
I only create the LocalExecutionEnvironment once.

The web manager shows the job ID changing correctly every time a new job is
executed.

Since it is the same execution environment (and therefore the same cluster
instance I imagine), those completed jobs should show as well, no?

On Wed, 5 Sep 2018 at 18:40, Chesnay Schepler  wrote:

> When you create an environment that way, then the cluster is shutdown once
> the job completes.
> The WebUI can _appear_ as still working since all the files, and data
> about the job, is cached in the browser.
>
> On 05.09.2018 17:39, Miguel Coimbra wrote:
>
> Hello,
>
> I'm having difficulty reading the status (such as time taken for each
> dataflow operator in a job) of jobs that have completed.
>
> First, when I click on "Completed jobs" on the web interface (by default
> at 8081), no job shows up.
> I see jobs that exist as "Running", but as soon as they finish, I would
> expect them to appear in the "Complete jobs" section, but no luck.
>
> Consider that I am running locally (web UI is running, I checked and it is
> available via browser) on 8081.
> None of these links worked for checking jobs that have already finished,
> such as the job ID 618fac9da6ea458f5091a9c40e54cbcc that had been running:
>
> http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
> http://127.0.0.1:8081/completed-jobs/618fac9da6ea458f5091a9c40e54cbcc
>
> I'm running with a LocalExecutionEnvironment with with the method:
>
> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>
> I hope anyone may be able to help.
>
> Best,
>
>
>
>


REST: reading completed jobs' details

2018-09-05 Thread Miguel Coimbra
Hello,

I'm having difficulty reading the status (such as time taken for each
dataflow operator in a job) of jobs that have completed.

First, when I click on "Completed jobs" on the web interface (by default at
8081), no job shows up.
I see jobs that exist as "Running", but as soon as they finish, I would
expect them to appear in the "Complete jobs" section, but no luck.

Consider that I am running locally (web UI is running, I checked and it is
available via browser) on 8081.
None of these links worked for checking jobs that have already finished,
such as the job ID 618fac9da6ea458f5091a9c40e54cbcc that had been running:

http://127.0.0.1:8081/jobs/618fac9da6ea458f5091a9c40e54cbcc
http://127.0.0.1:8081/completed-jobs/618fac9da6ea458f5091a9c40e54cbcc

I'm running with a LocalExecutionEnvironment with with the method:

ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)

I hope anyone may be able to help.

Best,


InvalidTypesException: Type of TypeVariable 'K' in 'class X' could not be determined

2018-08-16 Thread Miguel Coimbra
Hello,

I have some code which compiles correctly (Flink 1.4) under Java 8.
It uses generic types.
While it compiles correctly, the execution fails with the error:

org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'K' in 'class X' could not be determined.

This is my main:

public static void main(final String[] args) {
X x = new X();
}


This is my class X:

public class X {

public X() {
TypeInformation keySelector = TypeInformation.of(new
TypeHint(){});
}
}


Perhaps I'm lacking knowledge on the way Java's generics work, but why
can't Flink determine the TypeVariable of 'K'?
As I am instantiating X parameterized as a Long, that information should
eventually reach Flink and the constructor of X would be equivalent to this:

public X() {
TypeInformation keySelector = TypeInformation.of(new
TypeHint(){});
}

During execution, however, this error pops up.
What am I missing here, and what is the best way to achieve this generic
behavior in a Flink-idiomatic way?

Thank you very much for your time.


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-21 Thread Miguel Coimbra
Hello,

Just to provide a brief update: I got this working by moving to the stable
version 1.4.2.
I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem
occurred in both.

​If I'm not mistaken, LocalEnvironment is primarily ​targeted at debugging
scenarios.
In my case, I explicitly want to use it on a complex series of jobs for now.
However, it seems some sort of bug was introduced after 1.4.2?

I ask this because my same code leads to the operators stuck on
​
java.lang.Thread.State: WAITING in the snapshot versions but it works fine
in 1.4.2.
Was there any specific design change after 1.4.2 regarding the way the
Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?)
when using LocalEnvironment?

I would like to explore this issue and perhaps contribute to fixing it or
at least understand.

Thank you very much.​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 17 April 2018 at 22:52, Miguel Coimbra <miguel.e.coim...@gmail.com>
wrote:

> Hello James,
>
> Thanks for the information.
> I noticed something suspicious as well: I have chains of operators where
> the first operator will ingest the expected amount of records but will not
> emit any, leaving the following operator empty in a "RUNNING" state.
> For example:
>
>
>
> I will get back if I find out more.
>
>
> Best regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote:
>
>> Miguel, I and my colleague ran into same problem yesterday.
>> We were expecting Flink to get 4 inputs from Kafka and write the inputs
>> to Cassandra, but the operators got stuck after the 1st input is written
>> into Cassandra.
>> This is how DAG looks like:
>> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
>> After we disable the auto chaining (https://ci.apache.org/project
>> s/flink/flink-docs-release-1.4/dev/stream/operators/#task-
>> chaining-and-resource-groups), all 4 inputs are read from Kafka and
>> written into Cassandra.
>> We are still figuring out why the chaining causes the blocking.
>>
>>
>> This is a UTF-8 formatted mail
>> ---
>> James C.-C.Yu
>> +886988713275
>>
>> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>
>>> Chesnay, following your suggestions I got access to the web interface
>>> and also took a closer look at the debugging logs.
>>> I have noticed one problem regarding the web interface port - it keeps
>>> changing port now and then during my Java program's execution.
>>>
>>> Not sure if that is due to my program launching several job executions
>>> sequentially, but the fact is that it happened.
>>> Since I am accessing the web interface via tunneling, it becomes rather
>>> cumbersome to keep adapting it.
>>>
>>> Another particular problem I'm noticing is that this exception
>>> frequently pops up (debugging with log4j):
>>>
>>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>>> ster.slotpool.SlotPool  - Releasing slot with slot request id
>>> 9055ef473251505dac04c99727106dc9.
>>> org.apache.flink.util.FlinkException: Slot is being returned to the
>>> SlotPool.
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>>> at java.util.concurrent.CompletableFuture.uniHandle(Completable
>>> Future.java:822)
>>> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>>> tableFuture.java:834)
>>> at java.util.concurrent.CompletableFuture.handle(CompletableFut
>>> ure.java:2155)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>>> t.releaseSlot(SingleLogicalSlot.java:130)
>>> at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>>> ignedResource(Execution.java:1239)
>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:946)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1588)
>>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>>> tionState(JobMaster.java:593)
>

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where
the first operator will ingest the expected amount of records but will not
emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote:

> Miguel, I and my colleague ran into same problem yesterday.
> We were expecting Flink to get 4 inputs from Kafka and write the inputs to
> Cassandra, but the operators got stuck after the 1st input is written into
> Cassandra.
> This is how DAG looks like:
> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
> After we disable the auto chaining (https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/#task-chaining-and-resource-groups), all 4 inputs are read from
> Kafka and written into Cassandra.
> We are still figuring out why the chaining causes the blocking.
>
>
> This is a UTF-8 formatted mail
> -------
> James C.-C.Yu
> +886988713275
>
> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Chesnay, following your suggestions I got access to the web interface and
>> also took a closer look at the debugging logs.
>> I have noticed one problem regarding the web interface port - it keeps
>> changing port now and then during my Java program's execution.
>>
>> Not sure if that is due to my program launching several job executions
>> sequentially, but the fact is that it happened.
>> Since I am accessing the web interface via tunneling, it becomes rather
>> cumbersome to keep adapting it.
>>
>> Another particular problem I'm noticing is that this exception frequently
>> pops up (debugging with log4j):
>>
>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>> ster.slotpool.SlotPool  - Releasing slot with slot request id
>> 9055ef473251505dac04c99727106dc9.
>> org.apache.flink.util.FlinkException: Slot is being returned to the
>> SlotPool.
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>> at java.util.concurrent.CompletableFuture.uniHandle(Completable
>> Future.java:822)
>> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>> tableFuture.java:834)
>> at java.util.concurrent.CompletableFuture.handle(CompletableFut
>> ure.java:2155)
>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.releaseSlot(SingleLogicalSlot.java:130)
>> at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>> ignedResource(Execution.java:1239)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:946)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1588)
>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>> tionState(JobMaster.java:593)
>> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:210)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
>> (AkkaRpcActor.java:154)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
>> essage(FencedAkkaRpcActor.java:66)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
>> ive$1(AkkaRpcActor.java:132)
>> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
>> .scala:544)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at scala.concurrent.forkjoin.ForkJoinTask.d

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread Miguel Coimbra
Chesnay, following your suggestions I got access to the web interface and
also took a closer look at the debugging logs.
I have noticed one problem regarding the web interface port - it keeps
changing port now and then during my Java program's execution.

Not sure if that is due to my program launching several job executions
sequentially, but the fact is that it happened.
Since I am accessing the web interface via tunneling, it becomes rather
cumbersome to keep adapting it.

Another particular problem I'm noticing is that this exception frequently
pops up (debugging with log4j):

00:17:54,368 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
- Releasing slot with slot request id 9055ef473251505dac04c99727106dc9.
org.apache.flink.util.FlinkException: Slot is being returned to the
SlotPool.
at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$
ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521)
at org.apache.flink.runtime.jobmaster.slotpool.
SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
at java.util.concurrent.CompletableFuture.uniHandle(
CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture.uniHandleStage(
CompletableFuture.java:834)
at java.util.concurrent.CompletableFuture.handle(
CompletableFuture.java:2155)
at org.apache.flink.runtime.jobmaster.slotpool.
SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
at org.apache.flink.runtime.executiongraph.Execution.
releaseAssignedResource(Execution.java:1239)
at org.apache.flink.runtime.executiongraph.Execution.
markFinished(Execution.java:946)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.
updateState(ExecutionGraph.java:1588)
at org.apache.flink.runtime.jobmaster.JobMaster.
updateTaskExecutionState(JobMaster.java:593)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
handleRpcInvocation(AkkaRpcActor.java:210)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
handleMessage(AkkaRpcActor.java:154)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
handleMessage(FencedAkkaRpcActor.java:66)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$
onReceive$1(AkkaRpcActor.java:132)
at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(
ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(
ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)

Don't know if the internals of Flink are explicitly using an exception for
control flow, but there are several occurrences of this as time goes by.

Regarding my program itself, I've achieved some progress.
In my program I need to do a sequence of series of Flink jobs, and need
extra care to make sure no DataSet instance from job *i* is being used in
an operator in job *i + 1*.
I believe this was generating the waiting scenarios I describe in an
earlier email.
The bottom line is to be extra careful about when job executions are
actually triggered and to make sure that a DataSet which will need to be
used in different Flink jobs is available for example as a file in
secondary storage (possibly masked as a memory-mapping) and is exclusively
read from that source.
This means ensuring the job that originally produces a DataSet (for reuse
on a later job) assigns to it a DataSink for secondary storage.

I'm going to keep digging taking this in account - if will report back if I
manage to fix everything or find a new problem.

Thanks again,



Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote:

> ah yes, currently when you use that method the UI is started on a random
> port. I'm currently fixing that in this PR
> <https://github.com/apache/flink/pull/5814> that will be merged today.
> For now you will enable logging and search for something along the lines of
> "http://: was granted leadership"
>
> Sorry for the inconvenience.
>
> On 16.04.2018 15:0

Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-16 Thread Miguel Coimbra
Thanks for the suggestions Chesnay, I will try them out.

However, I have already tried your suggestion with the dependency
flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml would
make it so the web front-end is running when I call the following line?

LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
alEnvironmentWithWebUI(conf);

I added flink-runtime-web  in my pom.xml, recompiled and launched the
program but I simply got "Unable to connect" in my browser (Firefox) on
localhost:8081.
Performing wget on localhost:8081 resulted in this:

$ wget localhost:8081
--2018-04-16 12:47:26--  http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection
refused.

It seems something was bound to localhost:8081 but the connection is not
working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly_${scala.binary.version}
${flink.version}


org.apache.flink
flink-gelly-examples_${scala.binary.version}
${flink.version}


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


 org.apache.flink
 flink-streaming-scala_${scala.binary.version}
 ${flink.version}






* org.apache.flink
 flink-runtime-web_${scala.binary.version}
 ${flink.version}*

Have you managed to get the web front-end in local mode?


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:

> The thing with createLocalEnvironmentWithWebUI is that it requires
> flink-runtime-web to be on the classpath, which is rarely the class when
> running things in the IDE.
> It should work fine in the IDE if you add it as a dependency to your
> project. This should've been logged as a warning.
>
> Chaining is unrelated to this issue as join operators are never chained to
> one another.
> Lambda functions are also not the issue, if they were the job would fail
> much earlier.
>
> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
> hence produces no output, which now also blocks T3.
>
> There are multiple possible explanations i can come up with:
> * the preceding operators are blocked on something or *really *slow
> * the preceding operators are actually finished, but aren't shutting down
> due to an implementation error
> * a deadlock in Flink's join logic
> * a deadlock in Flink's network stack
>
> For the first 2 we will have to consult the UI or logs. You said you were
> dumping the input DataSets into files, but were they actually complete?
>
> A deadlock in the network stack should appear as all existing operator
> threads being blocked.
> We can probably rule out a problem with the join logic by removing the
> second join and trying again.
>
>
>
> On 16.04.2018 03:10, Miguel Coimbra wrote:
>
> Hello,
>
> It would seem that the function which is supposed to launch local mode
> with the web front-end doesn't launch the front-end at all...
> This function seems not to be doing what it is supposed to do, if I'm not
> mistaken:
>
> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
> alEnvironmentWithWebUI(conf);
>
> Regarding the preceding operators, the thread dumps I got were pointing to
> a specific set of operations over DataSet instances that were passed into
> my function.
> Below I show the code segment and put the lines where threads are waiting
> in *bold*:
>
> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final Graph<K,
> VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
> return vertices
> .joinWithHuge(originalGraph.getEdges())
> .where(0).equalTo(0)
> *.with((source, edge) -> edge)* *// Thread 1 is blocked here*
> .returns(originalGraph.getEdges().getType())
> .join(vertices)
> .where(1).equalTo(0)
> *.with((e, v) -> e) // Thread 3 is blocked here*
> .returns(originalGraph.getEdges().getType())
> .distinct(0, 1);
> }
>
> Note: the edges inside the graph originalGraph edge DataSet are much
> greater in number than the elements of the vertices DataSet, so I believe
> that function is being used correctly.

Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-15 Thread Miguel Coimbra
​Hello,

I am running into a situation where the Flink threads responsible for my
operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of
memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
with LocalEnvironment
on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().
enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs
(any data dependency between jobs is flushed to disk on job *i *and read
back from disk into a DataSet in job *i + 1*).
To reiterate, I am not launching a Flink cluster, I am just executing in
local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of
memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the result
is always the same: the process' memory fills up completely and then the
process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an
OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different
executions, and realized quite a few operator threads are stuck on
java.lang.Thread.State:
WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:



*Number 1:*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5
tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
  at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
  at org.apache.flink.runtime.operators.hash.
ReusingBuildSecondHashJoinIterator.callWithNextKey(
ReusingBuildSecondHashJoinIterator.java:122)
  at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
  at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
  at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.

Re: How to perform efficient DataSet reuse between iterations

2017-12-07 Thread Miguel Coimbra
Hello Fabian,

It really looks like an issue requiring attention.

Since I am launching the client application via Maven, I opted to change
the maximum memory setting with
export MAVEN_OPTS="-Xms256m -Xmx4m".
To give an example, for three (3) iterations it worked fine with around 4
GB of memory, but for ten (10), launching a client with a limit of 40 GB of
memory still resulted in the same exception.
The client application did consume memory into the 30-40 GB range before
running out of memory.

Since I am working with very big datasets, spilling to disk becomes
impractical with respect to conducting experiments.
A dataset of thirty (30) GB would take a lot of time if we were to spill to
disk a hundred times.

That is why I was asking about a caching operator or spilling intermediate
results in a BulkIteration.
You said the caching operator would not be trivial to implement.
However, would it also be very hard to allow for spilling in Flink's
BulkIteration
or DeltaIteration?

I am not sure I can use semantic annotations because the program just adds
vertices and edges to the graph (Gelly API) and runs an algorithm
implementing the GraphAlgorithm interface with ScatterFunction and
GatherFunction
class extensions.
There are some join functions though, I will look into applying them.

Besides this, can you recommend an initial place in the code where one
should look to begin studying the optimizer?

Thanks for your time once more,

Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 6 December 2017 at 10:27, Fabian Hueske <fhue...@gmail.com> wrote:

> Hmm, this does not look too good.
> As I expected, the program gets stuck in the optimizer. Plan optimization
> can be quite expensive for large plans.
>
> There might be a way to improve the optimization of large plans by cutting
> the plan space but I would not expect this to be fixed in the near future.
> Touching the optimizer is a delicate issue and requires a lot of care and
> effort.
>
> I would try to increase the heap size of the client JVM (check the
> /bin/flink file which starts the client JVM).
> This should bring down the GC overhead, but the computational complexity
> of enumerating plans would remain the same.
> You might want to have a look at semantic annotations [1]. Adding these to
> your user functions should have an effect on the plan enumeration.
>
> If this doesn't help, the only solution might be to cut the program into
> multiple pieces and spill intermediate results to disk.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#semantic-annotations
>
> 2017-12-06 11:10 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> After increasing the message size akka parameter, the client resulted in
>> the following exception after some time.
>> This confirms that the JobManager never received the job request:
>>
>> [WARNING]
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:
>> 294)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at org.apache.flink.optimizer.costs.CostEstimator.costOperator(
>> CostEstimator.java:78)
>> at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePl
>> ans(TwoInputNode.java:516)
>> at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativ
>> ePlans(SingleInputNode.java:256)
>> at org.apache.flink.optimizer.dag.WorksetIterationNode.instanti
>> ate(WorksetIterationNode.java:344)
>> at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidat
>> es(TwoInputNode.java:557)
>> at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePl
>> ans(TwoInputNode.java:478)
>> at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativ
>> ePlans(SingleInputNode.java:256)
>> at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativ
>> ePlans(SingleInputNode.java:256)
>> at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativ
>> ePlans(SingleInputNode.java:256)
>> at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePl
>> ans(Da

Re: How to perform efficient DataSet reuse between iterations

2017-12-05 Thread Miguel Coimbra
likely have a negative impact on the overall execution
> time.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/monitoring/rest_api.html#details-of-a-running-or-completed-job
> [2] https://stackoverflow.com/questions/33691612/apache-
> flink-stepwise-execution/33691957#33691957
>
>
>
> 2017-11-29 0:44 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello,
>>
>> You're right, I was overlooking that.
>> With your suggestion, I now just define a different sink in each
>> iteration of the loop.
>> Then they all output to disk when executing a single bigger plan.
>>
>> I have one more question: I know I can retrieve the total time this
>> single job takes to execute, but what if I want to know the time taken for
>> specific operators in the dag?
>> Is there some functionality in the Flink Batch API akin to counting
>> elements but for measuring time instead?
>> For example, if I am not mistaken, an operator can be executed in
>> parallel or serially (with a parallelism of one).
>>
>> Is there a straightforward way to get the time taken by the operator's
>> tasks?
>> In a way that I could:
>>
>> a) just get the time of a single task (if running serially) to get the
>> total operator execution time;
>> b) know the time taken by each parallel component of the operator's
>> execution so I could know where and what was the "lagging element" in the
>> operator's execution.
>>
>> Is this possible? I was hoping I could retrieve this information in the
>> Java program itself and avoid processing logs.
>>
>> Thanks again.
>>
>> Best regards,
>>
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> by calling result.count(), you compute the complete plan from the
>>> beginning and not just the operations you added since the last execution.
>>> Looking at the output you posted, each step takes about 15 seconds (with
>>> about 5 secs of initialization).
>>> So the 20 seconds of the first step include initialization + 1st step.
>>> The 35 seconds on the second step include initialization, 1st step + 2nd
>>> step.
>>> If you don't call count on the intermediate steps, you can compute the
>>> 4th step in 65 seconds.
>>>
>>> Implementing a caching operator would be a pretty huge effort because
>>> you need to touch code at many places such as the API, optimizer, runtime,
>>> scheduling, etc.
>>> The documentation you found should still be applicable. There hasn't
>>> been major additions to the DataSet API and runtime in the last releases.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>
>>>> Hello Fabian,
>>>>
>>>> Thank you for the reply.
>>>> I was hoping the situation had in fact changed.
>>>>
>>>> As far as I know, I am not calling execute() directly even once - it
>>>> is being called implicitly by simple DataSink elements added to the
>>>> plan through count():
>>>>
>>>> System.out.println(String.format("%d-th graph algorithm produced %d
>>>> elements. (%d.%d s).",
>>>> executionCounter,
>>>> *result.count()*, // this would trigger
>>>> execution...
>>>> env.getLastJobExecutionResult(
>>>> ).getNetRuntime(TimeUnit.SECONDS),
>>>> env.getLastJobExecutionResult(
>>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>>>
>>>>
>>>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>>>> processed with classes such as  
>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>>>> sure on the most direct way to achieve this.
>>>> Perhaps I missed some online documentation that would help to get a
>>>> grip on how to contribute to the different parts of Flink?
>>>>
>>>> I did find some information which hints at implementing this sort of
>>>> thing (such as adding custom 

Re: How to perform efficient DataSet reuse between iterations

2017-11-28 Thread Miguel Coimbra
Hello,

You're right, I was overlooking that.
With your suggestion, I now just define a different sink in each iteration
of the loop.
Then they all output to disk when executing a single bigger plan.

I have one more question: I know I can retrieve the total time this single
job takes to execute, but what if I want to know the time taken for
specific operators in the dag?
Is there some functionality in the Flink Batch API akin to counting
elements but for measuring time instead?
For example, if I am not mistaken, an operator can be executed in parallel
or serially (with a parallelism of one).

Is there a straightforward way to get the time taken by the operator's
tasks?
In a way that I could:

a) just get the time of a single task (if running serially) to get the
total operator execution time;
b) know the time taken by each parallel component of the operator's
execution so I could know where and what was the "lagging element" in the
operator's execution.

Is this possible? I was hoping I could retrieve this information in the
Java program itself and avoid processing logs.

Thanks again.

Best regards,


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> by calling result.count(), you compute the complete plan from the
> beginning and not just the operations you added since the last execution.
> Looking at the output you posted, each step takes about 15 seconds (with
> about 5 secs of initialization).
> So the 20 seconds of the first step include initialization + 1st step.
> The 35 seconds on the second step include initialization, 1st step + 2nd
> step.
> If you don't call count on the intermediate steps, you can compute the 4th
> step in 65 seconds.
>
> Implementing a caching operator would be a pretty huge effort because you
> need to touch code at many places such as the API, optimizer, runtime,
> scheduling, etc.
> The documentation you found should still be applicable. There hasn't been
> major additions to the DataSet API and runtime in the last releases.
>
> Best, Fabian
>
>
>
> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> Thank you for the reply.
>> I was hoping the situation had in fact changed.
>>
>> As far as I know, I am not calling execute() directly even once - it is
>> being called implicitly by simple DataSink elements added to the plan
>> through count():
>>
>> System.out.println(String.format("%d-th graph algorithm produced %d
>> elements. (%d.%d s).",
>> executionCounter,
>> *result.count()*, // this would trigger
>> execution...
>> env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.SECONDS),
>> env.getLastJobExecutionResult(
>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>
>>
>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>> processed with classes such as  
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>> sure on the most direct way to achieve this.
>> Perhaps I missed some online documentation that would help to get a grip
>> on how to contribute to the different parts of Flink?
>>
>> I did find some information which hints at implementing this sort of
>> thing (such as adding custom operators) but it was associated to an old
>> version of Flink:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> internals/add_operator.html
>> However, as far as I know there is no equivalent page in the current
>> online stable or snapshot documentation.
>>
>> What would be the best way to go about this?
>>
>> It really seems that the DataSet stored in the result variable is always
>> representing an increasing sequence of executions and not just the results
>> of the last execution.
>>
>>
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Miguel,
>>>
>>> I'm sorry but AFAIK, the situation has not changed.
>>>
>>> Is it possible that you are calling execute() multiple times?
>>> In that case, the 1-st and 2-nd graph would be recomputed before the
>>> 3-rd graph is computed.
>>> That would

Re: How to perform efficient DataSet reuse between iterations

2017-11-28 Thread Miguel Coimbra
Hello Fabian,

Thank you for the reply.
I was hoping the situation had in fact changed.

As far as I know, I am not calling execute() directly even once - it is
being called implicitly by simple DataSink elements added to the plan
through count():

System.out.println(String.format("%d-th graph algorithm produced %d
elements. (%d.%d s).",
executionCounter,
*result.count()*, // this would trigger
execution...
env.getLastJobExecutionResult(
).getNetRuntime(TimeUnit.SECONDS),
env.getLastJobExecutionResult(
).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));


I have taken a look at Flink's code base (e.g. how the dataflow dag is
processed with classes such as
org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
org.apache.flink.api.java.operators.OperatorTranslation) but I'm not sure
on the most direct way to achieve this.
Perhaps I missed some online documentation that would help to get a grip on
how to contribute to the different parts of Flink?

I did find some information which hints at implementing this sort of thing
(such as adding custom operators) but it was associated to an old version
of Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/internals/add_operator.html
However, as far as I know there is no equivalent page in the current online
stable or snapshot documentation.

What would be the best way to go about this?

It really seems that the DataSet stored in the result variable is always
representing an increasing sequence of executions and not just the results
of the last execution.



Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Miguel,
>
> I'm sorry but AFAIK, the situation has not changed.
>
> Is it possible that you are calling execute() multiple times?
> In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd
> graph is computed.
> That would explain the increasing execution time of 15 seconds.
>
> Best, Fabian
>
> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello,
>>
>> I'm facing a problem in an algorithm where I would like to constantly
>> update a DataSet representing a graph, perform some computation, output
>> one or more DataSink (such as a file on the local system) and then reuse
>> the DataSet for a next iteration.
>> ​I want to avoid spilling the results to disk at the end of an iteration
>> and to read it back in the next iterations - the graph is very big and I do
>> not wish to incur that time overhead.
>> I want to reuse the full result DataSet of each iteration in the next
>> one and I want to save to disk a small percentage of the produced DataSet
>> for each iteration.
>> The space complexity is rather constant - the number of edges in the
>> graph increases by only 100 between iterations (which is an extremely low
>> percentage of the original graph's edges) and is obtained using
>> env.fromCollection(edgesToAdd).
>>
>> Although I am using Flink's Gelly API for graphs, I have no problem
>> working directly with the underlying vertex and edge DataSet elements.​
>>
>> Two ways to do this occur to me, but it seems both are currently not
>> supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
>> [1]:
>>
>> «​*​*
>>
>>
>> *Unfortunately, it is not currently possible to output intermediate
>> results from a bulk iteration.You can only output the final result at the
>> end of the iteration.Also, as you correctly noticed, Flink cannot
>> efficiently unroll a while-loop or for-loop, so that won't work either.»*
>>
>> *1.* I thought I could create a bulk iteration, perform the computation
>> and between iterations, output the result to the file system.
>> However, this is not possible, as per Vasia's answer, and produces the
>> following exception on execution when I try (for example, to calculate a
>> centrality metric for every vertex and dump the results to disk), as
>> expected based on that information:
>>
>> org.apache.flink.api.common.InvalidProgramException: A data set that is
>> part of an iteration was used as a sink or action. Did you forget to close
>> the iteration?
>>
>> *2.* Using a for loop in my own program and triggering sequential Flink
>> job executions.
>> Problem: in this scenario, while I am able to use a DataSet produced in
>> an iteration's Flink job (and dump the desired output information to disk)
>> and pass it to the next Flink job, the computation t

How to perform efficient DataSet reuse between iterations

2017-11-26 Thread Miguel Coimbra
Hello,

I'm facing a problem in an algorithm where I would like to constantly
update a DataSet representing a graph, perform some computation, output one
or more DataSink (such as a file on the local system) and then reuse
the DataSet
for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration
and to read it back in the next iterations - the graph is very big and I do
not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one
and I want to save to disk a small percentage of the produced DataSet for
each iteration.
The space complexity is rather constant - the number of edges in the graph
increases by only 100 between iterations (which is an extremely low
percentage of the original graph's edges) and is obtained using
env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working
directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not
supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
[1]:

«​*​*


*Unfortunately, it is not currently possible to output intermediate results
from a bulk iteration.You can only output the final result at the end of
the iteration.Also, as you correctly noticed, Flink cannot efficiently
unroll a while-loop or for-loop, so that won't work either.»*

*1.* I thought I could create a bulk iteration, perform the computation and
between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the
following exception on execution when I try (for example, to calculate a
centrality metric for every vertex and dump the results to disk), as
expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is
part of an iteration was used as a sink or action. Did you forget to close
the iteration?

*2.* Using a for loop in my own program and triggering sequential Flink job
executions.
Problem: in this scenario, while I am able to use a DataSet produced in an
iteration's Flink job (and dump the desired output information to disk) and
pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with
env.startNewSession() before the loop - no impact)

Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. *(20.96 s)*.
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th graph algorithm produced 33536 elements. *(35.913 s)*.
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th graph algorithm produced 33543 elements. *(49.624 s)*.
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th graph algorithm produced 33557 elements. *(66.209 s)*.

Note that the number of elements in the output DataSet is equal to the
number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the
result DataSet of iteration i - 1 by means of the
g.joinWithVertices(previousResultDataSet,
new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous
values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements
VertexJoinFunction {
@Override
public Double vertexJoin(final Double vertexValue, final Double
inputValue) throws Exception {
return inputValue;
}
}

​I have also used Flink's plan visualizer to check for discrepancies
between the first iteration and the tenth (for example), but the layout of
the plan remains exactly the same while the execution time continually
increases for what should be the same amount of computations.

*Bottom-line:* ​I was hoping someone could tell me how to overcome the
performance bottleneck using the sequential job approach or enabling the
output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a
local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);

​I wish to ​execute in a cluster later on with a bigger dataset, so it
would be essential that to maximize the ability to reuse the DataSets that
are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your 

Re: Latest stable release binaries - broken links?

2017-04-10 Thread Miguel Coimbra
Yes, it seems it was really just a mistake on my part.
The link ending in ".tgz" made me think the archives were wrong.

Thanks and sorry for the false alarm.

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 10 April 2017 at 14:39, Adam Shannon <adam.shan...@banno.com> wrote:

> The set of 4 pages you linked load the list of mirrors to download from.
>
> On Mon, Apr 10, 2017 at 6:50 AM, Miguel Coimbra <
> miguel.e.coim...@gmail.com> wrote:
>
>> Hello,
>>
>> Perhaps just a mistake on my part, but at http://flink.apache.org/downlo
>> ads.html#binaries for the 1.2.0 binaries, all the download links seem to
>> be broken:
>>
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink
>> -1.2.0-bin-hadoop2-scala_2.10.tgz
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink
>> -1.2.0-bin-hadoop2-scala_2.11.tgz
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink
>> -1.2.0-bin-hadoop27-scala_2.10.tgz
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink
>> -1.2.0-bin-hadoop27-scala_2.11.tgz
>>
>> Every archive I download is 15.5 KB and I can't decompress it.
>>
>> This is the official page with links for 1.2.0 binary versions, correct?
>>
>> Kind regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>
>
>
> --
> Adam Shannon | Software Engineering Supervisor |  Banno | Jack Henry
> 317 6th Ave, Suite 201 Des Moines, IA 50309 | Ext: x 440198 | Cell:
> 515.867.8337
>


Latest stable release binaries - broken links?

2017-04-10 Thread Miguel Coimbra
Hello,

Perhaps just a mistake on my part, but at
http://flink.apache.org/downloads.html#binaries for the 1.2.0 binaries, all
the download links seem to be broken:

http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink-1.2.0-bin-hadoop2-scala_2.10.tgz
http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink-1.2.0-bin-hadoop2-scala_2.11.tgz
http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink-1.2.0-bin-hadoop27-scala_2.10.tgz
http://www.apache.org/dyn/closer.lua/flink/flink-1.2.0/flink-1.2.0-bin-hadoop27-scala_2.11.tgz

Every archive I download is 15.5 KB and I can't decompress it.

This is the official page with links for 1.2.0 binary versions, correct?

Kind regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-21 Thread Miguel Coimbra
y, it looks like you could
> simply add reverse edges to your input file (with an optional ' | sort |
> uniq' following):
>
> $ cat edges.txt | awk ' { print $1, $2; print $2, $1 } '
>
> The drivers are being reworked for 1.3 to better reuse code and options
> which will better support additional drivers and algorithms and make
> documentation simpler.
>
> Greg
>
> On Fri, Jan 20, 2017 at 2:06 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Miguel,
>>
>> the LocalClusteringCoefficient algorithm returns a DataSet of type Result,
>> which basically wraps a vertex id, its degree, and the number of triangles
>> containing this vertex. The number 11 you see is indeed the degree of
>> vertex 5113. The Result type contains the method
>> getLocalClusteringCoefficientScore() which allows you to retrieve the
>> clustering coefficient score for a vertex. The method simply divides the
>> numbers of triangles by the number of potential edges between neighbors.
>>
>> I'm sorry that you this is not clear in the docs. We should definitely
>> improve them to explain what is the output and how to retrieve the actual
>> clustering coefficient values. I have opened a JIRA for this [1].
>>
>> Cheers,
>> -Vasia.
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-5597
>>
>> On 20 January 2017 at 19:31, Miguel Coimbra <miguel.e.coim...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> In the documentation of the LocalClusteringCoefficient algorithm, it is
>>> said:
>>>
>>>
>>> *The local clustering coefficient measures the connectedness of each
>>> vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
>>> 1.0 (neighborhood is a clique).*
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> apis/batch/libs/gelly.html#local-clustering-coefficient
>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/library_methods.html#local-clustering-coefficient>
>>>
>>> However, upon running the algorithm (undirected version), I obtained
>>> values above 1.
>>>
>>> The result I got was this. As you can see, vertex 5113 has a score of
>>> 11:
>>> (the input edges for the graph are shown further below - around *35
>>> edges*):
>>>
>>> (4907,(1,0))
>>> *(5113,(11,0))*
>>> (6008,(0,0))
>>> (6064,(1,0))
>>> (6065,(1,0))
>>> (6107,(0,0))
>>> (6192,(0,0))
>>> (6252,(1,0))
>>> (6279,(1,0))
>>> (6465,(1,0))
>>> (6545,(0,0))
>>> (6707,(1,0))
>>> (6715,(1,0))
>>> (6774,(0,0))
>>> (7088,(0,0))
>>> (7089,(1,0))
>>> (7171,(0,0))
>>> (7172,(1,0))
>>> (7763,(0,0))
>>> (7976,(1,0))
>>> (8056,(1,0))
>>> (9748,(1,0))
>>> (10191,(1,0))
>>> (10370,(1,0))
>>> (10371,(1,0))
>>> (14310,(1,0))
>>> (16785,(1,0))
>>> (19801,(1,0))
>>> (26284,(1,0))
>>> (26562,(0,0))
>>> (31724,(1,0))
>>> (32443,(1,0))
>>> (32938,(0,0))
>>> (33855,(1,0))
>>> (37929,(0,0))
>>>
>>> This was from a small isolated test with these edges:
>>>
>>> 51136008
>>> 51136774
>>> 511332938
>>> 51136545
>>> 51137088
>>> 511337929
>>> 511326562
>>> 51136107
>>> 51137171
>>> 51136192
>>> 51137763
>>> 97485113
>>> 101915113
>>> 60645113
>>> 60655113
>>> 62795113
>>> 49075113
>>> 64655113
>>> 67075113
>>> 70895113
>>> 71725113
>>> 143105113
>>> 62525113
>>> 338555113
>>> 79765113
>>> 262845113 <262%20845%20113>
>>> 80565113
>>> 103715113
>>> 167855113
>>> 198015113
>>> 67155113
>>> 317245113
>>> 324435113
>>> 103705113
>>>
>>> I am not sure what I may be doing wrong, but is there perhaps some form
>>> of normalization lacking in my execution of:
>>>
>>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>>> usteringCoefficient.Result;
>>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>>> usteringCoefficient;
>>>
>>> Am I supposed to divide all scores by the greatest score obtained by the
>>> algorithm?
>>>
>>> Thank you very much!
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>> Skype: miguel.e.coimbra
>>>
>>
>>
>


Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-20 Thread Miguel Coimbra
Hello,

In the documentation of the LocalClusteringCoefficient algorithm, it is
said:


*The local clustering coefficient measures the connectedness of each
vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
1.0 (neighborhood is a clique).*

https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/
gelly.html#local-clustering-coefficient


However, upon running the algorithm (undirected version), I obtained values
above 1.

The result I got was this. As you can see, vertex 5113 has a score of 11:
(the input edges for the graph are shown further below - around *35 edges*):

(4907,(1,0))
*(5113,(11,0))*
(6008,(0,0))
(6064,(1,0))
(6065,(1,0))
(6107,(0,0))
(6192,(0,0))
(6252,(1,0))
(6279,(1,0))
(6465,(1,0))
(6545,(0,0))
(6707,(1,0))
(6715,(1,0))
(6774,(0,0))
(7088,(0,0))
(7089,(1,0))
(7171,(0,0))
(7172,(1,0))
(7763,(0,0))
(7976,(1,0))
(8056,(1,0))
(9748,(1,0))
(10191,(1,0))
(10370,(1,0))
(10371,(1,0))
(14310,(1,0))
(16785,(1,0))
(19801,(1,0))
(26284,(1,0))
(26562,(0,0))
(31724,(1,0))
(32443,(1,0))
(32938,(0,0))
(33855,(1,0))
(37929,(0,0))

This was from a small isolated test with these edges:

51136008
51136774
511332938
51136545
51137088
511337929
511326562
51136107
51137171
51136192
51137763
97485113
101915113
60645113
60655113
62795113
49075113
64655113
67075113
70895113
71725113
143105113
62525113
338555113
79765113
262845113
80565113
103715113
167855113
198015113
67155113
317245113
324435113
103705113

I am not sure what I may be doing wrong, but is there perhaps some form of
normalization lacking in my execution of:

org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient;

Am I supposed to divide all scores by the greatest score obtained by the
algorithm?

Thank you very much!

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-17 Thread Miguel Coimbra
Hello Vasia,

I am going to look into this.
Hopefully I will contribute to the implementation and documentation.

Regards,

-- Forwarded message --
From: Vasiliki Kalavri <vasilikikala...@gmail.com>
To: user@flink.apache.org
Cc:
Date: Sun, 15 Jan 2017 18:01:41 +0100
Subject: Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 -
java.lang.NullPointerException
Hi Miguel,

this is a bug, thanks a lot for reporting! I think the problem is that the
implementation assumes that labelsWithHighestScores contains the vertex
itself as initial label.

Could you please open a JIRA ticket for this and attach your code and data
as an example to reproduce? We should also improve the documentation for
this library method. I see that you are initializing vertex values and you
have called getUndirected(), but the library method already does both of
these operations internally.

Cheers,
-Vasia.

On 13 January 2017 at 17:12, Miguel Coimbra <miguel.e.coim...@gmail.com>
wrote:
Hello,

If I missed the answer to this or some essential step of the documentation,
please do tell.
I am having the following problem while trying out the
org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly
API (Java).

Specs: JDK 1.8.0_102 x64
Apache Flink: 1.1.4

Suppose I have a very small (I tried with an example with 38 vertices as
well) dataset stored in a tab-separated file 3-vertex.tsv:

#id1 id2 score
010
020
030

This is just a central vertex with 3 neighbors (disconnected between
themselves).
I am loading the dataset and executing the algorithm with the following
code:


---
// Load the data from the .tsv file.
final DataSet<Tuple3<Long, Long, Double>> edgeTuples =
env.readCsvFile(inputPath)
.fieldDelimiter("\t") // node IDs are separated by spaces
.ignoreComments("#")  // comments start with "%"
.types(Long.class, Long.class, Double.class);

// Generate a graph and add reverse edges (undirected).
final Graph<Long, Long, Double> graph = Graph.fromTupleDataSet(edgeTuples,
new MapFunction<Long, Long>() {
private static final long serialVersionUID =
8713516577419451509L;
public Long map(Long value) {
return value;
}
},
env).getUndirected();

// CommunityDetection parameters.
final double hopAttenuationDelta = 0.5d;
final int iterationCount = 10;

// Prepare and trigger the execution.
DataSet<Vertex<Long, Long>> vs = graph.run(new
org.apache.flink.graph.library.CommunityDetection(iterationCount,
hopAttenuationDelta)).getVertices();

vs.print();
​---​

​Running this code throws the following exception​ (check the bold line):

​org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
dTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
uture.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
All(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
l.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
orkerThread.java:107)

*Caused by: java.lang.NullPointerExceptionat
org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)*
at org.apache.flink.graph.spargel.ScatterGatherIteration$Gather
UdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSec
ondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTas
k.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.
run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
k.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.

Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-16 Thread Miguel Coimbra
Hello,

I created the JIRA issue at:
https://issues.apache.org/jira/browse/FLINK-5506

Is it possible to submit suggestions to the documentation?
If so, where can I do so?

I actually did this based on the example at this page (possible Flink
versions aside):

https://flink.apache.org/news/2015/08/24/introducing-flink-gelly.html#use-case-music-profiles

​From the documentation I assumed that CommunityDetection has the same
internal semantics as LabelPropagation (minus the algorithm difference
itself)​
​​
​.
It would be relevant to mention that it is not necessary to generate IDs
(as in the music example) and that an undirected representation of the
graph is generated before the algorithm being executed.

Kind regards,​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-13 Thread Miguel Coimbra
Hello,

If I missed the answer to this or some essential step of the documentation,
please do tell.
I am having the following problem while trying out the
org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly
API (Java).

Specs: JDK 1.8.0_102 x64
Apache Flink: 1.1.4

Suppose I have a very small (I tried with an example with 38 vertices as
well) dataset stored in a tab-separated file 3-vertex.tsv:

#id1 id2 score
010
020
030

This is just a central vertex with 3 neighbors (disconnected between
themselves).
I am loading the dataset and executing the algorithm with the following
code:


---
// Load the data from the .tsv file.
final DataSet> edgeTuples =
env.readCsvFile(inputPath)
.fieldDelimiter("\t") // node IDs are separated by spaces
.ignoreComments("#")  // comments start with "%"
.types(Long.class, Long.class, Double.class);

// Generate a graph and add reverse edges (undirected).
final Graph graph = Graph.fromTupleDataSet(edgeTuples,
new MapFunction() {
private static final long serialVersionUID =
8713516577419451509L;
public Long map(Long value) {
return value;
}
},
env).getUndirected();

// CommunityDetection parameters.
final double hopAttenuationDelta = 0.5d;
final int iterationCount = 10;

// Prepare and trigger the execution.
DataSet> vs = graph.run(new
org.apache.flink.graph.library.CommunityDetection(iterationCount,
hopAttenuationDelta)).getVertices();

vs.print();
​---​

​Running this code throws the following exception​ (check the bold line):

​org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

*Caused by: java.lang.NullPointerExceptionat
org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)*
at
org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
at
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)​


​After a further look, I set a breakpoint (Eclipse IDE debugging) at the
line in bold:

org.apache.flink.graph.library.CommunityDetection.java (source code
accessed automatically by Maven)
// find the highest score of maxScoreLabel
*double highestScore = labelsWithHighestScore.get(maxScoreLabel);​*

​- maxScoreLabel has the value 3.​

- labelsWithHighestScore was initialized as: Map
labelsWithHighestScore = new TreeMap<>();

- labelsWithHighestScore is a TreeMap and has the values:

{0=0.0}
null
null
[0=0.0]
null
1​

​It seems that the value 3 should have been added to that
​labelsWithHighestScore
some time during execution, but because it wasn't, an exception is thrown.

If anyone is able to shed light on the issue it would be great - what might
be causing it, am I doing something clearly wrong, or has this been fixed
in a another version?

Thank you very much,

Best regards,



Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-09 Thread Miguel Coimbra
Hello Fabian,

So if I want to have 10 nodes with one working thread each, I would just
set this, I assume:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 10

There is progress, albeit little.
I am now running on a directory with more space.
For 10 iterations of label propagation, I am getting this error at the end
(on the TaskManager).
I thought the execution was taking too much time, so I checked CPU usage of
the TaskManager and it was really low.
Checking the log on the TaskManager, I found this error at the bottom in
bold:


2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - Freeing
task resources for IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(1/1)
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - DataSink
(collect()) (1/1) switched to FINISHED
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task - Freeing
task resources for DataSink (collect()) (1/1)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state FINISHED to
JobManager for task IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(89eb2508cbda679502c2e0b258068274)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state FINISHED to
JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed)
2016-12-09 09:46:04,080 ERROR
akka.remote.EndpointWriter- Transient
association error (association remains live)
*akka.remote.OversizedPayloadException: Discarding oversized payload sent
to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963
<http://flink@172.18.0.2:6123/user/jobmanager#1638751963>]: max allowed
size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
was 79885809 bytes.*

Do you have any idea what this might be?

Kind regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 6 December 2016 at 19:57, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Miguel,
>
> estimating the space requirements is not trivial. It depends of course on
> the algorithm and the data itself. I'm not an expert for graph algorithms
> and don't know your datasets.
>
> But have you tried to run the algorithm in a non dockerized environment?
> That might help to figure out if this is an issue with the Docker
> configuration rather than Flink.
>
> Btw. If you want to run with a parallelism of 3 you need at least three
> slots, either 3 three slots in one TM or 1 slot in each of three TMs.
>
> Best,
> Fabian
>
> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> Thanks for the attention. Still haven't solved this.
>> I did set up a cron job to clean the Docker images daily - thanks for
>> that hint.
>> As a last resort, I am going to look into a 2 TB NAS to see if this works.
>>
>> What is confusing me is that this happens also for the
>> com-orkut.ungraph.txt dataset which is much smaller than
>> com-friendster.ungraph.txt but not that much bigger than the
>> com-dblp.ungraph.txt.
>>
>> DBLP - ​I am able to run the DBLP on one TaskManager container.​
>> https://snap.stanford.edu/data/com-DBLP.html
>> Nodes 317080  ~0.3 M
>> Edges 1049866 ~ 1 M
>>
>> Orkut - no disk space error.
>> https://snap.stanford.edu/data/com-Orkut.html
>> Nodes 3072441 ~3 M
>> Edges 117185083 ~ 117 M
>>
>> ​Friendster - no disk space error.
>> https://snap.stanford.edu/data/com-Friendster.html
>> Nodes 65608366 ~65 M
>> Edges 1806067135 ~ 1800 M​
>>
>> For testing purposes, I'm using a JobManager (in its own Docker
>> container), a single TaskManager (in its own Docker container) with the
>> following config parameters:
>>
>> Heap is currently configured to 6 GB:
>> taskmanager.heap.mb: 6000
>>
>> Parallelism is set as such:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>>
>> It is my understanding that if I want to test for example N = 3
>> TaskManagers (each in its own Docker container) with minimum parallelism
>> within each, I would use:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-05 Thread Miguel Coimbra
lected[v] = id:" + v.getId() +
"\tval:" + v.getValue());
if(!commSizes.containsKey(v.getValue())) {
commSizes.put(v.getValue(), new ArrayList());
}
commSizes.get(v.getValue()).add(v.getId());
}


System.out.println("#communities:\t" +
commSizes.keySet().size() + "\n|result|:\t" + result.count() +
"\n|collected|:\t" + collected.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}

​Thanks for your time,​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

-- Forwarded message --

> From: Fabian Hueske <
> ​​
> fhue...@gmail.com>
> To: user@flink.apache.org
> Cc:
> Date: Mon, 5 Dec 2016 08:40:04 +0100
> Subject:
> ​​
> Re: Thread 'SortMerger spilling thread' terminated due to an exception: No
> space left on device
> Hi Miguel,
>
> have you found a solution to your problem?
> I'm not a docker expert but this forum thread looks like could be related
> to your problem [1].
>
> Best,
> Fabian
>
> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894
>
> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> I have created a directory on my host machine user directory (
>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the
>> TaskManager and JobManager containers.
>> Each container will thus have the following directory /home/flink/htmp
>>
>> host ---> container
>> /home/myuser/mydir ---> /home/flink/htmp
>>
>> I had previously done this successfully with the a host directory which
>> holds several SNAP data sets.
>> In the Flink configuration file, I specified /home/flink/htmp to be used
>> as the tmp dir for the TaskManager.
>> This seems to be working, as I was able to start the cluster and invoke
>> Flink for that Friendster dataset.
>>
>> However, during execution, there were 2 intermediate files which kept
>> growing until they reached about 30 GB.
>> At that point, the Flink TaskManager threw the exception again:
>>
>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger spilling thread' terminated due to an exception: No space left
>> on device
>>
>> Here is an ls excerpt of the directory on the host (to which the
>> TaskManager container was also writing successfully) shortly before the
>> exception:
>>
>> *31G *9d177a1971322263f1597c3378885ccf.channel
>> *31G* a693811249bc5f785a79d1b1b537fe93.channel
>>
>> Now I believe the host system is capable of storing hundred GBs more, so
>> I am confused as to what the problem might be.
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> ​
>>>
>>> Hi Miguel,
>>>
>>> the exception does indeed indicate that the process ran out of available
>>> disk space.
>>> The quoted paragraph of the blog post describes the situation when you
>>> receive the IOE.
>>>
>>> By default the systems default tmp dir is used. I don't know which
>>> folder that would be in a Docker setup.
>>> You can configure the temp dir using the taskmanager.tmp.dirs config
>>> key.
>>> Please see the configuration documentation for details [1].
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> setup/config.html#jobmanager-amp-taskmanager
>>>
>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>> ​
>>>
>>>> Hello,
>>>>
>>>> I have a problem for which I hope someone will be able to give a hint.
>>>> I am running the Flink *standalone* cluster with 2 Docker containers
>>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>>>
>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>>>> edges.
>>>> https://snap.stanford.edu/data/com-Friendster.html
>>>>
>>>> I am trying to run the Gelly built-in label propagation algorithm on
>>>> top of it.
>>>> As this is a very big dataset, I believe I am exceeding the available
>>>> RAM and that the system is using secondary storage, which then fails:
>>>>
>>>>
>>>> Connected to JobManager at 

Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-02 Thread Miguel Coimbra
Hello Fabian,

I have created a directory on my host machine user directory (
/home/myuser/mydir ) and I am mapping it as a volume with Docker for the
TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which
holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as
the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke
Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept
growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger spilling thread' terminated due to an exception: No space left
on device

Here is an ls excerpt of the directory on the host (to which the
TaskManager container was also writing successfully) shortly before the
exception:

*31G *9d177a1971322263f1597c3378885ccf.channel
*31G* a693811249bc5f785a79d1b1b537fe93.channel

Now I believe the host system is capable of storing hundred GBs more, so I
am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

​
>
> Hi Miguel,
>
> the exception does indeed indicate that the process ran out of available
> disk space.
> The quoted paragraph of the blog post describes the situation when you
> receive the IOE.
>
> By default the systems default tmp dir is used. I don't know which folder
> that would be in a Docker setup.
> You can configure the temp dir using the taskmanager.tmp.dirs config key.
> Please see the configuration documentation for details [1].
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
> setup/config.html#jobmanager-amp-taskmanager
>
> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
> ​
>
>> Hello,
>>
>> I have a problem for which I hope someone will be able to give a hint.
>> I am running the Flink *standalone* cluster with 2 Docker containers (1
>> TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>
>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>> edges.
>> https://snap.stanford.edu/data/com-Friendster.html
>>
>> I am trying to run the Gelly built-in label propagation algorithm on top
>> of it.
>> As this is a very big dataset, I believe I am exceeding the available RAM
>> and that the system is using secondary storage, which then fails:
>>
>>
>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>> 0.2:6123/user/jobmanager#894624508]
>> 12/01/2016 17:58:24Job execution switched to status RUNNING.
>> 12/01/2016 17:58:24DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> SCHEDULED
>> 12/01/2016 17:58:24DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> DEPLOYING
>> 12/01/2016 17:58:24DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> RUNNING
>> 12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to SCHEDULED
>> 12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to DEPLOYING
>> 12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to RUNNING
>> 12/01/2016 17:59:51Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to FAILED
>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger spilling thread' terminated due to an exception: No space left
>> on device*
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1098)
>> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>> ava:86)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:486)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:351)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> *Caused by: java.io.IOException: Thread 'SortMe

Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-01 Thread Miguel Coimbra
Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink *standalone* cluster with 2 Docker containers (1
TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of
it.
As this is a very big dataset, I believe I am exceeding the available RAM
and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24Job execution switched to status RUNNING.
12/01/2016 17:58:24DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
12/01/2016 17:58:24DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
12/01/2016 17:58:24DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
RUNNING
12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to SCHEDULED
12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to DEPLOYING
12/01/2016 17:58:24Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to RUNNING
12/01/2016 17:59:51Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to FAILED
*java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger spilling thread' terminated due to an exception: No space left
on device*
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
*Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: No space left on device*
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at
org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)


I do not have secondary storage limitations on the host system, so I
believe the system would be able to handle whatever is spilled to the
disk...
Perhaps this is a Docker limitation regarding the usage of the host's
secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which
I am missing?
Running the label propagation of Gelly on this dataset and cluster
configuration, what would be the expected behavior if the system consumes
all the memory?


I believe the SortMerger thread is associated to the following mechanism
described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
*The Sort-Merge-Join works by first sorting both input data sets on their
join key attributes (Sort Phase) and merging the sorted data sets as a
second step (Merge Phase). The sort is done in-memory if the local
partition of a data set is small enough. Otherwise, an external merge-sort
is done by collecting data until the working memory is filled, sorting it,
writing the sorted data to the local filesystem, and starting over by
filling the working memory again with more incoming data. After all input
data has been received, sorted, and written as sorted runs to the local
file system, a fully sorted stream can be obtained. This is done by reading
the partially sorted runs from the local filesystem and sort-merging the
records on the fly. Once the sorted streams of both inputs are available,
both streams are sequentially read and merge-joined in a zig-zag fashion by
comparing the sorted join key attributes, building join element pairs for
matching keys, and advancing the sorted stream with the lower join key.*

I am still investigating the possibility that Docker is at fault regarding
secondary storage limitations, but how would I go about estimating the
amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra

Too few memory segments provided. Hash Table needs at least 33 memory segments.

2016-11-08 Thread Miguel Coimbra
Dear community,

I have a problem which I hope you'll be able to help with.
I apologize in advance for the verbosity of the post.
I am running the Flink standalone cluster (not even storing to the
filesystem) with 2 Docker containers.

I set the image of the Dockerfile for Flink 1.1.2, which was the same
version of the main class in the .jar
The Docker image was configured to use Java 8, which is what the project's
pom.xml requires as well.
I have also edited the TaskManager conf/flink-con.yaml to have the
following values:


taskmanager.heap.mb: 7512

taskmanager.network.numberOfBuffers: 16048



Properties of this host/docker setup:
- host machine has *256 GB *of RAM
- job manager container is running with default flink config
- task manager has *7.5 GB *of memory available
- task manager number of buffers is *16048 *which is very generous compared
to the default value

I am testing on the SNAP DBLP dataset:
https://snap.stanford.edu/data/com-DBLP.html
It has:

 317080 nodes
1049866 edges

These are the relevant parts of the pom.xml of the project:
*(note: the project executes without error for local executions without the
cluster)*



UTF-8

UTF-8
1.8
1.8
1.1.2
  
.


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-core
${flink.version}


  org.apache.flink
  flink-streaming-java_2.10
  ${flink.version}


org.apache.flink
flink-clients_2.10
${flink.version}


org.apache.flink
flink-gelly_2.10
${flink.version}


  junit
  junit
  3.8.1
  test

  

I am running (what I believe to be) a simple Gelly application, performing
the ConnectedComponents algorithm with 30 iterations:

public static void main(String[] args) {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();


final String dataPath = args[0];

final DataSet> edgeTuples =
env.readCsvFile(dataPath)
.fieldDelimiter("\t") // node IDs are separated by spaces
.ignoreComments("#")  // comments start with "%"
.types(Long.class, Long.class);

try {
System.out.println("Tuple size: " + edgeTuples.count());
} catch (Exception e1) {
e1.printStackTrace();
}

/*
 * @param  the key type for edge and vertex identifiers
 * @param  the value type for vertices
 * @param  the value type for edges
 * public class Graph
 */


final Graph graph = Graph.fromTuple2DataSet(
edgeTuples,
new MapFunction() {
private static final long serialVersionUID =
8713516577419451509L;
public Long map(Long value) {
return value;
}
},
env
);


try {
/**
 * @param  key type
 * @param  vertex value type
 * @param  edge value type
 * @param  the return type

class ConnectedComponents
implements GraphAlgorithm>>
*/

DataSet> verticesWithComponents =
graph.run(new ConnectedComponents(30));
System.out.println("Component count: " +
verticesWithComponents.count());
} catch (Exception e) {
e.printStackTrace();
}
}


However, the following is output on the host machine on execution:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
flink run -m 3de7625b8e28:6123 -c flink.graph.example.App
/home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar
/home/myuser/com-dblp.ungraph.txt

Cluster configuration: Standalone cluster with JobManager at /
172.19.0.2:6123
Using address 172.19.0.2:6123 to connect to JobManager.
JobManager web interface address http://172.19.0.2:8081
Starting execution of program
Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#-658812967]

11/08/2016 21:22:44 DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
11/08/2016 21:22:44 DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
11/08/2016 21:22:44 DataSource (at main(App.java:25)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to RUNNING
11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED
11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING
11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING
11/08/2016 21:22:44 DataSink (count())(1/1)