Re: k means - waiting for dataset

2015-05-26 Thread Pa Rö
thanks for your message,

maybe you can give me a exsample for the GroupReduceFunction?

2015-05-22 23:29 GMT+02:00 Fabian Hueske fhue...@gmail.com:

 There are two ways to do that:

 1) You use a GroupReduceFunction, which gives you an iterator over all
 points similar to Hadoop's ReduceFunction.
 2) You use the ReduceFunction to compute the sum and the count at the same
 time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
 final division.

 I'd go with the first choice. It's easier.

 Best, Fabian

 2015-05-22 23:09 GMT+02:00 Paul Röwer paul.roewer1...@googlemail.com:

  good evening,

 sorry, my english is not the best.

 by comupte the new centroid, i will sum all points of the cluster and
 form the new center.
 in my other implementation firstly i sum all point and at the end i
 divides by number of points.
 to example: (1+2+3+4)/4=2,5

 at flink i reduce always two point to one,
 for the example upstairs: (1+2)/2=1,5 -- (1,5+3)/2=2,25 --
 (2,25+4)=3,125

 how can i rewrite my function so, that it work like my other
 implementation?

 best regards,
 paul



 Am 22.05.2015 um 16:52 schrieb Stephan Ewen:

 Sorry, I don't understand the question.

  Can you describe a bit better what you mean with how i can sum all
 points and share thoug the counter ?

  Thanks!

 On Fri, May 22, 2015 at 2:06 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

   i have fix a bug at the input reading, but the results are still
 different.

 i think i have local the problem, in the other implementation i sum all
 geo points/time points and share thougt the counter.
  but in flink i sum two points and share thougt two, and sum the next...

  the method is the following:

 // sums and counts point coordinates
 private static final class CentroidAccumulator implements
 ReduceFunctionTuple2Integer, GeoTimeDataTupel {

 private static final long serialVersionUID =
 -4868797820391121771L;

 public Tuple2Integer, GeoTimeDataTupel reduce(Tuple2Integer,
 GeoTimeDataTupel val1, Tuple2Integer, GeoTimeDataTupel val2) {
 return new Tuple2Integer, GeoTimeDataTupel(val1.f0,
 addAndDiv(val1.f0,val1.f1,val2.f1));
 }
 }

 private static GeoTimeDataTupel addAndDiv(int
 clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
 long time = (input1.getTime()+input2.getTime())/2;
 ListLatLongSeriable list = new ArrayListLatLongSeriable();
 list.add(input1.getGeo());
 list.add(input2.getGeo());
 LatLongSeriable geo = Geometry.getGeoCenterOf(list);

 return new GeoTimeDataTupel(geo,time,POINT);
 }

  how i can sum all points and share thoug the counter?


 2015-05-22 9:53 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

  hi,
  if i print the centroids all are show in the output. i have implement
 k means with map reduce und spark. by same input, i get the same output.
 but in flink i get a one cluster output with this input set. (i use csv
 files from the GDELT projekt)

  here my class:

 public class FlinkMain {


 public static void main(String[] args) {
 //load properties
 Properties pro = new Properties();
 try {
 pro.load(new
 FileInputStream(./resources/config.properties));
 } catch (Exception e) {
 e.printStackTrace();
 }
 int maxIteration =
 1;//Integer.parseInt(pro.getProperty(maxiterations));
 String outputPath = pro.getProperty(flink.output);
 // set up execution environment
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 // get input points
 DataSetGeoTimeDataTupel points = getPointDataSet(env);
 DataSetGeoTimeDataCenter centroids = getCentroidDataSet(env);
 // set number of bulk iterations for KMeans algorithm
 IterativeDataSetGeoTimeDataCenter loop =
 centroids.iterate(maxIteration);
 DataSetGeoTimeDataCenter newCentroids = points
 // compute closest centroid for each point
 .map(new SelectNearestCenter()).withBroadcastSet(loop,
 centroids)
 // count and sum point coordinates for each centroid
 .groupBy(0).reduce(new CentroidAccumulator())
 // compute new centroids from point counts and coordinate
 sums
 .map(new CentroidAverager());
 // feed new centroids back into next iteration
 DataSetGeoTimeDataCenter finalCentroids =
 loop.closeWith(newCentroids);
 DataSetTuple2Integer, GeoTimeDataTupel clusteredPoints =
 points
 // assign points to final clusters
 .map(new
 SelectNearestCenter()).withBroadcastSet(finalCentroids, centroids);
 // emit result
 clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
 finalCentroids.writeAsText(outputPath+/centers);//print();
 // execute program
 try {
 env.execute(KMeans Flink);
 } catch (Exception e) {
 

Re: k means - waiting for dataset

2015-05-22 Thread Paul Röwer

good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and 
form the new center.
in my other implementation firstly i sum all point and at the end i 
divides by number of points.

to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 -- (1,5+3)/2=2,25 -- (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul


Am 22.05.2015 um 16:52 schrieb Stephan Ewen:

Sorry, I don't understand the question.

Can you describe a bit better what you mean with how i can sum all 
points and share thoug the counter ?


Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö paul.roewer1...@googlemail.com 
mailto:paul.roewer1...@googlemail.com wrote:


i have fix a bug at the input reading, but the results are still
different.

i think i have local the problem, in the other implementation i
sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the
next...

the method is the following:

// sums and counts point coordinates
private static final class CentroidAccumulator implements
ReduceFunctionTuple2Integer, GeoTimeDataTupel {

private static final long serialVersionUID =
-4868797820391121771L;

public Tuple2Integer, GeoTimeDataTupel
reduce(Tuple2Integer, GeoTimeDataTupel val1, Tuple2Integer,
GeoTimeDataTupel val2) {
return new Tuple2Integer, GeoTimeDataTupel(val1.f0,
addAndDiv(val1.f0,val1.f1,val2.f1));
}
}

private static GeoTimeDataTupel addAndDiv(int
clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
long time = (input1.getTime()+input2.getTime())/2;
ListLatLongSeriable list = new ArrayListLatLongSeriable();
list.add(input1.getGeo());
list.add(input2.getGeo());
LatLongSeriable geo = Geometry.getGeoCenterOf(list);

return new GeoTimeDataTupel(geo,time,POINT);
}

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com
mailto:paul.roewer1...@googlemail.com:

hi,
if i print the centroids all are show in the output. i have
implement k means with map reduce und spark. by same input, i
get the same output. but in flink i get a one cluster output
with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {


public static void main(String[] args) {
//load properties
Properties pro = new Properties();
try {
pro.load(new
FileInputStream(./resources/config.properties));
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration =
1;//Integer.parseInt(pro.getProperty(maxiterations));
String outputPath = pro.getProperty(flink.output);
// set up execution environment
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSetGeoTimeDataTupel points = getPointDataSet(env);
DataSetGeoTimeDataCenter centroids =
getCentroidDataSet(env);
// set number of bulk iterations for KMeans algorithm
IterativeDataSetGeoTimeDataCenter loop =
centroids.iterate(maxIteration);
DataSetGeoTimeDataCenter newCentroids = points
// compute closest centroid for each point
.map(new
SelectNearestCenter()).withBroadcastSet(loop, centroids)
// count and sum point coordinates for each centroid
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and
coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSetGeoTimeDataCenter finalCentroids =
loop.closeWith(newCentroids);
DataSetTuple2Integer, GeoTimeDataTupel
clusteredPoints = points
// assign points to final clusters
.map(new
SelectNearestCenter()).withBroadcastSet(finalCentroids,
centroids);
// emit result
clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
finalCentroids.writeAsText(outputPath+/centers);//print();
// execute program
try {
env.execute(KMeans Flink);
} catch (Exception e) {
e.printStackTrace();
}
}

private static final class 

Re: k means - waiting for dataset

2015-05-22 Thread Fabian Hueske
There are two ways to do that:

1) You use a GroupReduceFunction, which gives you an iterator over all
points similar to Hadoop's ReduceFunction.
2) You use the ReduceFunction to compute the sum and the count at the same
time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
final division.

I'd go with the first choice. It's easier.

Best, Fabian

2015-05-22 23:09 GMT+02:00 Paul Röwer paul.roewer1...@googlemail.com:

  good evening,

 sorry, my english is not the best.

 by comupte the new centroid, i will sum all points of the cluster and form
 the new center.
 in my other implementation firstly i sum all point and at the end i
 divides by number of points.
 to example: (1+2+3+4)/4=2,5

 at flink i reduce always two point to one,
 for the example upstairs: (1+2)/2=1,5 -- (1,5+3)/2=2,25 -- (2,25+4)=3,125

 how can i rewrite my function so, that it work like my other
 implementation?

 best regards,
 paul



 Am 22.05.2015 um 16:52 schrieb Stephan Ewen:

 Sorry, I don't understand the question.

  Can you describe a bit better what you mean with how i can sum all
 points and share thoug the counter ?

  Thanks!

 On Fri, May 22, 2015 at 2:06 PM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

   i have fix a bug at the input reading, but the results are still
 different.

 i think i have local the problem, in the other implementation i sum all
 geo points/time points and share thougt the counter.
  but in flink i sum two points and share thougt two, and sum the next...

  the method is the following:

 // sums and counts point coordinates
 private static final class CentroidAccumulator implements
 ReduceFunctionTuple2Integer, GeoTimeDataTupel {

 private static final long serialVersionUID =
 -4868797820391121771L;

 public Tuple2Integer, GeoTimeDataTupel reduce(Tuple2Integer,
 GeoTimeDataTupel val1, Tuple2Integer, GeoTimeDataTupel val2) {
 return new Tuple2Integer, GeoTimeDataTupel(val1.f0,
 addAndDiv(val1.f0,val1.f1,val2.f1));
 }
 }

 private static GeoTimeDataTupel addAndDiv(int
 clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
 long time = (input1.getTime()+input2.getTime())/2;
 ListLatLongSeriable list = new ArrayListLatLongSeriable();
 list.add(input1.getGeo());
 list.add(input2.getGeo());
 LatLongSeriable geo = Geometry.getGeoCenterOf(list);

 return new GeoTimeDataTupel(geo,time,POINT);
 }

  how i can sum all points and share thoug the counter?


 2015-05-22 9:53 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

  hi,
  if i print the centroids all are show in the output. i have implement k
 means with map reduce und spark. by same input, i get the same output. but
 in flink i get a one cluster output with this input set. (i use csv files
 from the GDELT projekt)

  here my class:

 public class FlinkMain {


 public static void main(String[] args) {
 //load properties
 Properties pro = new Properties();
 try {
 pro.load(new
 FileInputStream(./resources/config.properties));
 } catch (Exception e) {
 e.printStackTrace();
 }
 int maxIteration =
 1;//Integer.parseInt(pro.getProperty(maxiterations));
 String outputPath = pro.getProperty(flink.output);
 // set up execution environment
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 // get input points
 DataSetGeoTimeDataTupel points = getPointDataSet(env);
 DataSetGeoTimeDataCenter centroids = getCentroidDataSet(env);
 // set number of bulk iterations for KMeans algorithm
 IterativeDataSetGeoTimeDataCenter loop =
 centroids.iterate(maxIteration);
 DataSetGeoTimeDataCenter newCentroids = points
 // compute closest centroid for each point
 .map(new SelectNearestCenter()).withBroadcastSet(loop,
 centroids)
 // count and sum point coordinates for each centroid
 .groupBy(0).reduce(new CentroidAccumulator())
 // compute new centroids from point counts and coordinate
 sums
 .map(new CentroidAverager());
 // feed new centroids back into next iteration
 DataSetGeoTimeDataCenter finalCentroids =
 loop.closeWith(newCentroids);
 DataSetTuple2Integer, GeoTimeDataTupel clusteredPoints =
 points
 // assign points to final clusters
 .map(new
 SelectNearestCenter()).withBroadcastSet(finalCentroids, centroids);
 // emit result
 clusteredPoints.writeAsCsv(outputPath+/points, \n,  );
 finalCentroids.writeAsText(outputPath+/centers);//print();
 // execute program
 try {
 env.execute(KMeans Flink);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }

  private static final class SelectNearestCenter extends
 RichMapFunctionGeoTimeDataTupel,Tuple2Integer,GeoTimeDataTupel {


k means - waiting for dataset

2015-05-21 Thread Pa Rö
hi flink community,

i have implement k-means for clustering temporal geo data. i use the
following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work
parallel futher. if i look at the results, i have the feeling, that the
prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following
exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
at akka.actor.Props.newActor(Props.scala:337)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more

how can i say flink, that it should be wait for loading dataset, and what
say this exception?

best regards,
paul


Re: k means - waiting for dataset

2015-05-21 Thread Till Rohrmann
Hi Paul,

could you share your code with us so that we see whether there is any error.

Does this error also occurs with 0.9-SNAPSHOT?

Cheers,
Till

Che

On Thu, May 21, 2015 at 11:11 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hi flink community,

 i have implement k-means for clustering temporal geo data. i use the
 following github project and my own data structure:

 https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

 not i have the problem, that flink read the centroids from file and work
 parallel futher. if i look at the results, i have the feeling, that the
 prgramm load only one centroid point.

 i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
 following exception:
 ERROR actor.OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:65)
 at akka.actor.Props.newActor(Props.scala:337)
 at akka.actor.ActorCell.newActor(ActorCell.scala:534)
 at akka.actor.ActorCell.create(ActorCell.scala:560)
 ... 9 more

 how can i say flink, that it should be wait for loading dataset, and what
 say this exception?

 best regards,
 paul



Re: k means - waiting for dataset

2015-05-21 Thread Till Rohrmann
Concerning your first problem that you only see one resulting centroid,
your code looks good modulo the parts you haven't posted.

However, your problem could simply be caused by a bad selection of initial
centroids. If, for example, all centroids except for one don't get any
points assigned, then only one centroid will survive the iteration step.
How do you do it?

To check that all centroids are read you can print the contents of the
centroids DataSet. Furthermore, you can simply println the new centroids
after each iteration step. In local mode you can then observe the
computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen se...@apache.org wrote:

 Hi!

 This problem should not depend on any user code. There are no user-code
 dependent actors in Flink.

 Is there more stack trace that you can send us? It looks like it misses
 the core exception that is causing the issue is not part of the stack trace.

 Greetings,
 Stephan



 On Thu, May 21, 2015 at 11:11 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi flink community,

 i have implement k-means for clustering temporal geo data. i use the
 following github project and my own data structure:

 https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

 not i have the problem, that flink read the centroids from file and work
 parallel futher. if i look at the results, i have the feeling, that the
 prgramm load only one centroid point.

 i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
 following exception:
 ERROR actor.OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:65)
 at akka.actor.Props.newActor(Props.scala:337)
 at akka.actor.ActorCell.newActor(ActorCell.scala:534)
 at akka.actor.ActorCell.create(ActorCell.scala:560)
 ... 9 more

 how can i say flink, that it should be wait for loading dataset, and what
 say this exception?

 best regards,
 paul





Re: k means - waiting for dataset

2015-05-21 Thread Stephan Ewen
Hi!

This problem should not depend on any user code. There are no user-code
dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the
core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö paul.roewer1...@googlemail.com
wrote:

 hi flink community,

 i have implement k-means for clustering temporal geo data. i use the
 following github project and my own data structure:

 https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

 not i have the problem, that flink read the centroids from file and work
 parallel futher. if i look at the results, i have the feeling, that the
 prgramm load only one centroid point.

 i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
 following exception:
 ERROR actor.OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:65)
 at akka.actor.Props.newActor(Props.scala:337)
 at akka.actor.ActorCell.newActor(ActorCell.scala:534)
 at akka.actor.ActorCell.create(ActorCell.scala:560)
 ... 9 more

 how can i say flink, that it should be wait for loading dataset, and what
 say this exception?

 best regards,
 paul