Random Shuffling

2015-06-15 Thread Maximilian Alber
Hi Flinksters,

I would like to shuffle my elements in the data set and then split it in
two according to some ratio. Each element in the data set has an unique id.
Is there a nice way to do it with the flink api?
(It would be nice to have guaranteed random shuffling.)
Thanks!

Cheers,
Max


Random Selection

2015-06-15 Thread Maximilian Alber
Hi Flinksters,

I would like to randomly choose a element of my data set. But somehow I
cannot use scala.util inside my filter functions:

  val sample_x = X filter(new RichFilterFunction[Vector](){
var i: Int = -1

override def open(config: Configuration) = {
  i = scala.util.Random.nextInt(N)
}
def filter(a: Vector) = a.id == i
  })
  val sample_y = Y filter(new RichFilterFunction[Vector](){
def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
  })

That's the error I get:

Exception in thread main org.apache.flink.optimizer.CompilerException: An
error occurred while translating the optimized plan to a nephele JobGraph:
Error translating node 'Filter Filter at
Test$$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
grouped=null, unique=null] ]]': Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at Test$delayedInit$body.apply(test.scala:304)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(test.scala:45)
at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating
node 'Filter Filter at Test$$anonfun$10.apply(test.scala:276) : FLAT_MAP
[[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
[ordering=null, grouped=null, unique=null] ]]': Could not write the user
code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at
org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
... 21 more
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
... 26 more
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at

RE: Random Selection

2015-06-15 Thread Kruse, Sebastian
Hi everyone,

I did not reenact it, but I think the problem here is rather the anonymous 
class. It looks like it is created within a class, not an object. Thus it is 
not “static” in Java terms, which means that also its surrounding class (the 
job class) will be serialized. And in this job class, there seems to be a 
DataSet field, that cannot be serialized.

If that really is the problem, you should either define your anonymous class 
within the companion object of your job class or resort directly to a function 
(and make sure that you do not pass a variable from your job class into the 
scope of the function).

Cheers,
Sebastian

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 15. Juni 2015 14:16
To: user@flink.apache.org
Subject: Re: Random Selection


Hi Max,

the problem is that you’re trying to serialize the companion object of 
scala.util.Random. Try to create an instance of the scala.util.Random class and 
use this instance within your RIchFilterFunction to generate the random numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber 
alber.maximil...@gmail.comhttp://mailto:alber.maximil...@gmail.com wrote:
Hi Flinksters,
I would like to randomly choose a element of my data set. But somehow I cannot 
use scala.util inside my filter functions:

  val sample_x = X filter(new RichFilterFunction[Vector](){
var i: Int = -1

override def open(config: Configuration) = {
  i = scala.util.Random.nextInt(N)
}
def filter(a: Vector) = a.idhttp://a.id == i
  })
  val sample_y = Y filter(new RichFilterFunction[Vector](){
def filter(a: Vector) = a.idhttp://a.id == 
scala.util.Random.nextInt(N)
  })
That's the error I get:

Exception in thread main org.apache.flink.optimizer.CompilerException: An 
error occurred while translating the optimized plan to a nephele JobGraph: 
Error translating node 'Filter Filter at 
Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties 
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, 
grouped=null, unique=null] ]]': Could not write the user code wrapper class 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at 
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at 
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at Test$delayedInit$body.apply(test.scala:304)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$anonfun$main$1.apply(App.scala:71)
at scala.App$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(test.scala:45)
at Test.main(test.scala)
Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 
'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ 
GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties 
[ordering=null, grouped=null, unique=null] ]]': Could not write the user code 
wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper 
: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
at 
org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
at 

Re: Monitoring memory usage of a Flink Job

2015-06-15 Thread Till Rohrmann
Hi Tamara,

you can instruct Flink to write the current memory statistics to the log by
setting taskmanager.debug.memory.startLogThread: true in the Flink
configuration. Furthermore, you can control the logging interval with
taskmanager.debug.memory.logIntervalMs where the interval is specified in
milli seconds.

Cheers,
Till

On Mon, Jun 15, 2015 at 2:46 PM Fabian Hueske fhue...@gmail.com
http://mailto:fhue...@gmail.com wrote:

Hi Tamara,

 what kind of information do you need? Something like, size and usage of
 in-memory sort buffers or hash tables?
 Some information might written in DEBUG logs, but I'm not sure about that.
 Besides logs, I doubt that Flink monitors memory usage.

 Cheers, Fabian

 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com:

 Hi,

 I am running some experiments on Flink and was wondering if there is some
 way to monitor the memory usage of a Flink Job (running locally and on a
 cluster). I need to run multiple jobs and compare their memory usage.

 Cheers,

 Tamara


  ​


Re: Random Selection

2015-06-15 Thread Till Rohrmann
Hi Max,

the problem is that you’re trying to serialize the companion object of
scala.util.Random. Try to create an instance of the scala.util.Random class
and use this instance within your RIchFilterFunction to generate the random
numbers.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com
http://mailto:alber.maximil...@gmail.com wrote:

Hi Flinksters,

 I would like to randomly choose a element of my data set. But somehow I
 cannot use scala.util inside my filter functions:

   val sample_x = X filter(new RichFilterFunction[Vector](){
 var i: Int = -1

 override def open(config: Configuration) = {
   i = scala.util.Random.nextInt(N)
 }
 def filter(a: Vector) = a.id == i
   })
   val sample_y = Y filter(new RichFilterFunction[Vector](){
 def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
   })

 That's the error I get:

 Exception in thread main org.apache.flink.optimizer.CompilerException:
 An error occurred while translating the optimized plan to a nephele
 JobGraph: Error translating node 'Filter Filter at
 Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties
 [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
 grouped=null, unique=null] ]]': Could not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
 at
 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
 at
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
 at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
 at Test$delayedInit$body.apply(test.scala:304)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at Test$.main(test.scala:45)
 at Test.main(test.scala)
 Caused by: org.apache.flink.optimizer.CompilerException: Error translating
 node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP
 [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': Could not write the user
 code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202)
 at
 org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427)
 ... 21 more
 Caused by:
 org.apache.flink.runtime.operators.util.CorruptConfigurationException:
 Could not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305)
 ... 26 more
 Caused by: java.io.NotSerializableException:
 org.apache.flink.api.scala.DataSet
 

Re: Random Shuffling

2015-06-15 Thread Till Rohrmann
Hi Max,

you can always shuffle your elements using the rebalance method. What Flink
here does is to distribute the elements of each partition among all
available TaskManagers. This happens in a round-robin fashion and is thus
not completely random.

A different mean is the partitionCustom method which allows you to specify
for each element to which partition it shall be sent. You would have to
specify a Partitioner to do this.

For the splitting there is at moment no syntactic sugar. What you can do,
though, is to assign each item a split ID and then use a filter operation
to filter the individual splits. Depending on you split ID distribution you
will have differently sized splits.

Cheers,
Till

On Mon, Jun 15, 2015 at 1:50 PM Maximilian Alber alber.maximil...@gmail.com
http://mailto:alber.maximil...@gmail.com wrote:

Hi Flinksters,

 I would like to shuffle my elements in the data set and then split it in
 two according to some ratio. Each element in the data set has an unique id.
 Is there a nice way to do it with the flink api?
 (It would be nice to have guaranteed random shuffling.)
 Thanks!

 Cheers,
 Max

​


Re: Monitoring memory usage of a Flink Job

2015-06-15 Thread Tamara Mendt
Ok great, I will try this out and get back to you. Thanks =)

On Mon, Jun 15, 2015 at 2:52 PM, Till Rohrmann trohrm...@apache.org wrote:

 Hi Tamara,

 you can instruct Flink to write the current memory statistics to the log
 by setting taskmanager.debug.memory.startLogThread: true in the Flink
 configuration. Furthermore, you can control the logging interval with
 taskmanager.debug.memory.logIntervalMs where the interval is specified in
 milli seconds.

 Cheers,
 Till

 On Mon, Jun 15, 2015 at 2:46 PM Fabian Hueske fhue...@gmail.com
 http://mailto:fhue...@gmail.com wrote:

 Hi Tamara,

 what kind of information do you need? Something like, size and usage of
 in-memory sort buffers or hash tables?
 Some information might written in DEBUG logs, but I'm not sure about
 that. Besides logs, I doubt that Flink monitors memory usage.

 Cheers, Fabian

 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com:

 Hi,

 I am running some experiments on Flink and was wondering if there is
 some way to monitor the memory usage of a Flink Job (running locally and on
 a cluster). I need to run multiple jobs and compare their memory usage.

 Cheers,

 Tamara


  ​




-- 
Tamara Mendt


Monitoring memory usage of a Flink Job

2015-06-15 Thread Tamara Mendt
Hi,

I am running some experiments on Flink and was wondering if there is some
way to monitor the memory usage of a Flink Job (running locally and on a
cluster). I need to run multiple jobs and compare their memory usage.

Cheers,

Tamara


Re: Random Shuffling

2015-06-15 Thread Matthias J. Sax
I think, you need to implement an own Partitioner.java and hand it via
DataSet.partitionCustom(partitioner, field)

(Just specify any field you like; as you don't want to group by key, it
doesn't matter.)

When implementing the partitionier, you can ignore the key parameter and
compute the output channel randomly.

This is kind of a work-around, but it should work.


-Matthias

On 06/15/2015 01:49 PM, Maximilian Alber wrote:
 Hi Flinksters,
 
 I would like to shuffle my elements in the data set and then split it in
 two according to some ratio. Each element in the data set has an unique
 id. Is there a nice way to do it with the flink api?
 (It would be nice to have guaranteed random shuffling.)
 Thanks!
 
 Cheers,
 Max



signature.asc
Description: OpenPGP digital signature


Re: Monitoring memory usage of a Flink Job

2015-06-15 Thread Fabian Hueske
Hi Tamara,

what kind of information do you need? Something like, size and usage of
in-memory sort buffers or hash tables?
Some information might written in DEBUG logs, but I'm not sure about that.
Besides logs, I doubt that Flink monitors memory usage.

Cheers, Fabian

2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com:

 Hi,

 I am running some experiments on Flink and was wondering if there is some
 way to monitor the memory usage of a Flink Job (running locally and on a
 cluster). I need to run multiple jobs and compare their memory usage.

 Cheers,

 Tamara



Re: Random Selection

2015-06-15 Thread Maximilian Alber
Hi everyone!
Thanks! It seems the variable that makes the problems. Making an inner
class solved the issue.
Cheers,
Max

On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian sebastian.kr...@hpi.de
wrote:

  Hi everyone,



 I did not reenact it, but I think the problem here is rather the anonymous
 class. It looks like it is created within a class, not an object. Thus it
 is not “static” in Java terms, which means that also its surrounding class
 (the job class) will be serialized. And in this job class, there seems to
 be a DataSet field, that cannot be serialized.



 If that really is the problem, you should either define your anonymous
 class within the companion object of your job class or resort directly to a
 function (and make sure that you do not pass a variable from your job class
 into the scope of the function).



 Cheers,

 Sebastian



 *From:* Till Rohrmann [mailto:trohrm...@apache.org]
 *Sent:* Montag, 15. Juni 2015 14:16
 *To:* user@flink.apache.org
 *Subject:* Re: Random Selection



 Hi Max,

 the problem is that you’re trying to serialize the companion object of
 scala.util.Random. Try to create an instance of the scala.util.Random
 class and use this instance within your RIchFilterFunction to generate
 the random numbers.

 Cheers,
 Till

 On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber
 alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com
 wrote:

Hi Flinksters,

 I would like to randomly choose a element of my data set. But somehow I
 cannot use scala.util inside my filter functions:

   val sample_x = X filter(new RichFilterFunction[Vector](){
 var i: Int = -1

 override def open(config: Configuration) = {
   i = scala.util.Random.nextInt(N)
 }
 def filter(a: Vector) = a.id == i
   })
   val sample_y = Y filter(new RichFilterFunction[Vector](){
 def filter(a: Vector) = a.id == scala.util.Random.nextInt(N)
   })

 That's the error I get:

 Exception in thread main org.apache.flink.optimizer.CompilerException:
 An error occurred while translating the optimized plan to a nephele
 JobGraph: Error translating node 'Filter Filter at
 Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties
 [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
 grouped=null, unique=null] ]]': Could not write the user code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
 at
 org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
 at
 org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
 at
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
 at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
 at Test$delayedInit$body.apply(test.scala:304)
 at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
 at
 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.App$anonfun$main$1.apply(App.scala:71)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
 at scala.App$class.main(App.scala:71)
 at Test$.main(test.scala:45)
 at Test.main(test.scala)
 Caused by: org.apache.flink.optimizer.CompilerException: Error translating
 node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP
 [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': Could not write the user
 code wrapper class
 org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
 at
 

Re: Random Shuffling

2015-06-15 Thread Maximilian Alber
Thanks!

Ok, so for a random shuffle I need partitionCustom. But in that case the
data might be out of balance then?

For the splitting. Is there no way to have exact sizes?

Cheers,
Max

On Mon, Jun 15, 2015 at 2:26 PM, Till Rohrmann trohrm...@apache.org wrote:

 Hi Max,

 you can always shuffle your elements using the rebalance method. What
 Flink here does is to distribute the elements of each partition among all
 available TaskManagers. This happens in a round-robin fashion and is thus
 not completely random.

 A different mean is the partitionCustom method which allows you to
 specify for each element to which partition it shall be sent. You would
 have to specify a Partitioner to do this.

 For the splitting there is at moment no syntactic sugar. What you can do,
 though, is to assign each item a split ID and then use a filter operation
 to filter the individual splits. Depending on you split ID distribution you
 will have differently sized splits.

 Cheers,
 Till

 On Mon, Jun 15, 2015 at 1:50 PM Maximilian Alber
 alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com
 wrote:

 Hi Flinksters,

 I would like to shuffle my elements in the data set and then split it in
 two according to some ratio. Each element in the data set has an unique id.
 Is there a nice way to do it with the flink api?
 (It would be nice to have guaranteed random shuffling.)
 Thanks!

 Cheers,
 Max

 ​



Re: Help with Flink experimental Table API

2015-06-15 Thread Shiti Saxena
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek aljos...@apache.org
wrote:

 Hi,
 the reason why this doesn't work is that the TupleSerializer cannot deal
 with null values:

 @Test
 def testAggregationWithNull(): Unit = {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val table = env.fromElements[(Integer, String)](
  (123, a), (234, b), (345, c), (null, d)).toTable

  val total = table.select('_1.sum).collect().head.productElement(0)
  assertEquals(total, 702)
 }

 it would have to modified in a similar way to the PojoSerializer and 
 RowSerializer. You could either leave the tests as they are now in you pull 
 request or also modify the TupleSerializer. Both seem fine to me.

 Cheers,

 Aljoscha


 On Sun, 14 Jun 2015 at 20:28 Shiti Saxena ssaxena@gmail.com wrote:

 Hi,

 Re-writing the test in the following manner works. But I am not sure if
 this is the correct way.

 def testAggregationWithNull(): Unit = {

 val env = ExecutionEnvironment.getExecutionEnvironment
 val dataSet = env.fromElements[(Integer, String)]((123, a), (234,
 b), (345, c), (0, d))

 implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
   Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
 Seq(id, name))

 val rowDataSet = dataSet.map {
   entry =
 val row = new Row(2)
 val amount = if(entry._1100) null else entry._1
 row.setField(0, amount)
 row.setField(1, entry._2)
 row
 }

 val total =
 rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
 assertEquals(total, 702)
   }



 On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena ssaxena@gmail.com
 wrote:

 Hi,

 For

 val table = env.fromElements[(Integer, String)]((123, a), (234, b),
 (345, c), (null, d)).toTable

 I get the following error,

 Error translating node 'Data Source at
 org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
 (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[
 GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': null
 org.apache.flink.optimizer.CompilerException: Error translating node
 'Data Source at
 org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
 (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[
 GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': null
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
 at
 org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
 at
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
 at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
 at
 org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
 at 

Re: No space left on device IOException when using Cross operator

2015-06-15 Thread Stephan Ewen
Cross is a quadratic operation. As such, it produces very large results on
moderate inputs, which can easily exceed memory and disk space, if the
subsequent operation requires to gather all data (such as for the sort in
your case).

If you use on both inputs 10 MB of 100 byte elements (100K elements per
input), you end up with 10 billion elements after the cross, which is 1 TB
in size (assuming the result elements are also 100 bytes).

This is an inherent issue of using a quadratic operation with data that is
to large to be handled by a quadratic operation. Not much anyone can do
about this.

Try and see if you can replace the Cross operation by something else (Join,
CoGroup) or whether you can at least filter aggressively after the Cross
before the next operation.


On Mon, Jun 15, 2015 at 2:18 PM, Mihail Vieru vi...@informatik.hu-berlin.de
 wrote:

  Hi,

 I get the following *No space left on device IOException* when using
 the following Cross operator.
 The inputs for the operator are each just *10MB* in size (same input for
 IN1 and IN2; 1000 tuples) and I get the exception after Flink manages to
 fill *50GB* of SSD space and the partition becomes full.

 I have found a similar problem in the mailing list here:

 https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E

 As I currently don't have any more free file system space left, specifying
 other temporary folders for Flink is not an option.
 Any ideas on what could help?

 I'm using the latest 0.9-SNAPSHOT and run the job in a local execution
 environment.

 Best,
 Mihail


 *java.lang.Exception: The data preparation for task 'GroupReduce
 (GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an error:
 Error obtaining the sorted input: Thread 'SortMerger spilling thread'
 terminated due to an exception: No space left on device*
 *at
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)*
 *at
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)*
 *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)*
 *at java.lang.Thread.run(Thread.java:745)*
 *Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
 Thread 'SortMerger spilling thread' terminated due to an exception: No
 space left on device*
 *at
 org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)*
 *at
 org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)*
 *at
 org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)*
 *at
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)*
 *... 3 more*
 *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
 terminated due to an exception: No space left on device*
 *at
 org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)*
 *Caused by: java.io.IOException: No space left on device*
 *at sun.nio.ch.FileDispatcherImpl.write0(Native Method)*
 *at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)*
 *at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)*
 *at sun.nio.ch.IOUtil.write(IOUtil.java:65)*
 *at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)*
 *at
 org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)*




 *at
 org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471)
 **public static class crossKAPSPFilter implements
 CrossFunctionVertexInteger, Tuple2Integer[],String, VertexInteger,
 Tuple2Integer[],String, *
 *Tuple2Integer,String  {*

 *@Override*
 *public Tuple2Integer, String cross(*
 *VertexInteger, Tuple2Integer[], String vertex1,*
 *VertexInteger, Tuple2Integer[], String vertex2)
 throws Exception {*

 *int vertexIdFirst = vertex1.f0;*
 *int vertexIdSecond = vertex2.f0;*
 *Integer[] vertexDistanceVectorFirst = vertex1.f1.f0;*
 *Integer[] vertexDistanceVectorSecond = vertex2.f1.f0;*

 *if( **vertexIdFirst != vertexIdSecond*
 * vertexDistanceVectorFirst[vertexIdSecond] =
 grapDistThreshold*
 * vertexDistanceVectorSecond[vertexIdFirst] =
 grapDistThreshold **) {*
 *return new Tuple2Integer, String(vertex1.f0,
 vertex1.f1.f1);*
 *}*
 *else return null;*
 *}*
 *}*




Re: Help with Flink experimental Table API

2015-06-15 Thread Aljoscha Krettek
I think you can work on it. By the way, there are actually two serializers.
For Scala, CaseClassSerializer is responsible for tuples as well. In Java,
TupleSerializer is responsible for, well, Tuples.

On Tue, 16 Jun 2015 at 06:25 Shiti Saxena ssaxena@gmail.com wrote:

 Hi,

 Can I work on the issue with TupleSerializer or is someone working on it?

 On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek aljos...@apache.org
 wrote:

 Hi,
 the reason why this doesn't work is that the TupleSerializer cannot deal
 with null values:

 @Test
 def testAggregationWithNull(): Unit = {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val table = env.fromElements[(Integer, String)](
  (123, a), (234, b), (345, c), (null, d)).toTable

  val total = table.select('_1.sum).collect().head.productElement(0)
  assertEquals(total, 702)
 }

 it would have to modified in a similar way to the PojoSerializer and 
 RowSerializer. You could either leave the tests as they are now in you pull 
 request or also modify the TupleSerializer. Both seem fine to me.

 Cheers,

 Aljoscha


 On Sun, 14 Jun 2015 at 20:28 Shiti Saxena ssaxena@gmail.com wrote:

 Hi,

 Re-writing the test in the following manner works. But I am not sure if
 this is the correct way.

 def testAggregationWithNull(): Unit = {

 val env = ExecutionEnvironment.getExecutionEnvironment
 val dataSet = env.fromElements[(Integer, String)]((123, a), (234,
 b), (345, c), (0, d))

 implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
   Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
 Seq(id, name))

 val rowDataSet = dataSet.map {
   entry =
 val row = new Row(2)
 val amount = if(entry._1100) null else entry._1
 row.setField(0, amount)
 row.setField(1, entry._2)
 row
 }

 val total =
 rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
 assertEquals(total, 702)
   }



 On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena ssaxena@gmail.com
 wrote:

 Hi,

 For

 val table = env.fromElements[(Integer, String)]((123, a), (234, b),
 (345, c), (null, d)).toTable

 I get the following error,

 Error translating node 'Data Source at
 org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
 (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[
 GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': null
 org.apache.flink.optimizer.CompilerException: Error translating node
 'Data Source at
 org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505)
 (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[
 GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
 [ordering=null, grouped=null, unique=null] ]]': null
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
 at
 org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
 at
 org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
 at
 org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
 at
 org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
 at
 org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
 at
 org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
 at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
 at
 org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
 

No space left on device IOException when using Cross operator

2015-06-15 Thread Mihail Vieru

Hi,

I get the following *No space left on device IOException* when using 
the following Cross operator.
The inputs for the operator are each just *10MB* in size (same input for 
IN1 and IN2; 1000 tuples) and I get the exception after Flink manages to 
fill *50GB* of SSD space and the partition becomes full.


I have found a similar problem in the mailing list here:
https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E

As I currently don't have any more free file system space left, 
specifying other temporary folders for Flink is not an option.

Any ideas on what could help?

I'm using the latest 0.9-SNAPSHOT and run the job in a local execution 
environment.


Best,
Mihail


/java.lang.Exception: The data preparation for task 'GroupReduce 
(GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an error: 
Error obtaining the sorted input: Thread 'SortMerger spilling thread' 
terminated due to an exception: No space left on device//
//at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)//
//at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)//

//at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)//
//at java.lang.Thread.run(Thread.java:745)//
//Caused by: java.lang.RuntimeException: Error obtaining the sorted 
input: Thread 'SortMerger spilling thread' terminated due to an 
exception: No space left on device//
//at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)//
//at 
org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)//
//at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)//
//at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)//

//... 3 more//
//Caused by: *java.io.IOException*: Thread 'SortMerger spilling thread' 
terminated due to an exception: *No space left on device*//
//at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)//

//Caused by: java.io.IOException: No space left on device//
//at sun.nio.ch.FileDispatcherImpl.write0(Native Method)//
//at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)//
//at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)//
//at sun.nio.ch.IOUtil.write(IOUtil.java:65)//
//at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)//
//at 
org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)//
//at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471)




//public static class crossKAPSPFilter implements 
CrossFunctionVertexInteger, Tuple2Integer[],String, VertexInteger, 
Tuple2Integer[],String, //

//Tuple2Integer,String  {//
//
//@Override//
//public Tuple2Integer, String cross(//
//VertexInteger, Tuple2Integer[], String vertex1,//
//VertexInteger, Tuple2Integer[], String 
vertex2) throws Exception {//


//int vertexIdFirst = vertex1.f0;//
//int vertexIdSecond = vertex2.f0;//
//Integer[] vertexDistanceVectorFirst = vertex1.f1.f0;//
//Integer[] vertexDistanceVectorSecond = vertex2.f1.f0;//

//if( //vertexIdFirst != vertexIdSecond//
// vertexDistanceVectorFirst[vertexIdSecond] 
= grapDistThreshold//
// vertexDistanceVectorSecond[vertexIdFirst] 
= grapDistThreshold //) {//
//return new Tuple2Integer, String(vertex1.f0, 
vertex1.f1.f1);//

//}//
//else return null;//
//}//
//}/



RE: Load balancing

2015-06-15 Thread Kruse, Sebastian
Hi Gianmarco,

The processing time is quadratic in the size of the single elements. I was 
already applying that strategy that you also proposed, but tried to find out if 
there is a way of balancing the subitems of these large items over the workers 
without shuffling the whole dataset. However, I noticed the PKG strategy and 
maybe it will come in handy in some other place :)
So, thanks again for the pointers!

Cheers,
Sebastian

From: Gianmarco De Francisci Morales [mailto:g...@apache.org]
Sent: Freitag, 12. Juni 2015 19:02
To: user@flink.apache.org
Subject: Re: Load balancing

Hi Sebastian,

Maybe I misunderstood your problem.
Is the processing time quadratic in the size of the single element of the 
dataset?
Or is it quadratic in the number of elements of the dataset with a single key?
That is, is the element heavy or is it the key heavy?

In the second case you can use PKG.
In the first case, I don't think you really need any system level help.
Given that you can split up the work for each element, you can just transform 
the dataset so that a single heavy element is replaced by a set of generated 
sub-elements, with the ID of the original element as the key.
Then you can process the subelements in parallel, and finally group by key to 
aggregate the result.

Cheers,

--
Gianmarco

On 11 June 2015 at 19:16, Kruse, Sebastian 
sebastian.kr...@hpi.demailto:sebastian.kr...@hpi.de wrote:
Hi Gianmarco,

Thanks for the pointer!

I had a quick look at the paper, but unfortunately I don’t see a connection to 
my problem. I have a batch job and elements in my dataset, that need quadratic 
much processing time depending on their size. The largest ones, that cause 
higher-than-average load, shall be split up and the splits shall be distributed 
among the workers. Your paper says “In  principle,  depending  on  the  
application,  two  different messages might impose a different load on workers. 
However, in  most  cases  these  differences  even  out  and  modeling  such 
application-specific differences is not necessary.” Maybe, I am missing 
something, but doesn’t this assumption render PKG inapplicable to my case? 
Objections to that are of course welcome :)

Cheers,
Sebastian

From: Gianmarco De Francisci Morales 
[mailto:g...@apache.orgmailto:g...@apache.org]
Sent: Mittwoch, 10. Juni 2015 15:40
To: user@flink.apache.orgmailto:user@flink.apache.org
Subject: Re: Load balancing

We have been working on an adaptive load balancing strategy that would address 
exactly the issue you point out.
FLINK-1725 is the starting point for the integration.

Cheers,

--
Gianmarco

On 9 June 2015 at 20:31, Fabian Hueske 
fhue...@gmail.commailto:fhue...@gmail.com wrote:
Hi Sebastian,
I agree, shuffling only specific elements would be a very useful feature, but 
unfortunately it's not supported (yet).
Would you like to open a JIRA for that?
Cheers, Fabian

2015-06-09 17:22 GMT+02:00 Kruse, Sebastian 
sebastian.kr...@hpi.demailto:sebastian.kr...@hpi.de:
Hi folks,

I would like to do some load balancing within one of my Flink jobs to achieve 
good scalability. The rebalance() method is not applicable in my case, as the 
runtime is dominated by the processing of very few larger elements in my 
dataset. Hence, I need to distribute the processing work for these elements 
among the nodes in the cluster. To do so, I subdivide those elements into 
partial tasks and want to distribute these partial tasks to other nodes by 
employing a custom partitioner.

Now, my question is the following: Actually, I do not need to shuffle the 
complete dataset but only a few elements. So is there a way of telling within 
the partitioner, that data should reside on the same task manager? Thanks!

Cheers,
Sebastian