Random Shuffling
Hi Flinksters, I would like to shuffle my elements in the data set and then split it in two according to some ratio. Each element in the data set has an unique id. Is there a nice way to do it with the flink api? (It would be nice to have guaranteed random shuffling.) Thanks! Cheers, Max
Random Selection
Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions: val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.id == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) }) That's the error I get: Exception in thread main org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter Filter at Test$$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter Filter at Test$$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) ... 21 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) ... 26 more Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at
RE: Random Selection
Hi everyone, I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized. If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function). Cheers, Sebastian From: Till Rohrmann [mailto:trohrm...@apache.org] Sent: Montag, 15. Juni 2015 14:16 To: user@flink.apache.org Subject: Re: Random Selection Hi Max, the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers. Cheers, Till On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.comhttp://mailto:alber.maximil...@gmail.com wrote: Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions: val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.idhttp://a.id == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.idhttp://a.id == scala.util.Random.nextInt(N) }) That's the error I get: Exception in thread main org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) at
Re: Monitoring memory usage of a Flink Job
Hi Tamara, you can instruct Flink to write the current memory statistics to the log by setting taskmanager.debug.memory.startLogThread: true in the Flink configuration. Furthermore, you can control the logging interval with taskmanager.debug.memory.logIntervalMs where the interval is specified in milli seconds. Cheers, Till On Mon, Jun 15, 2015 at 2:46 PM Fabian Hueske fhue...@gmail.com http://mailto:fhue...@gmail.com wrote: Hi Tamara, what kind of information do you need? Something like, size and usage of in-memory sort buffers or hash tables? Some information might written in DEBUG logs, but I'm not sure about that. Besides logs, I doubt that Flink monitors memory usage. Cheers, Fabian 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com: Hi, I am running some experiments on Flink and was wondering if there is some way to monitor the memory usage of a Flink Job (running locally and on a cluster). I need to run multiple jobs and compare their memory usage. Cheers, Tamara
Re: Random Selection
Hi Max, the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers. Cheers, Till On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com wrote: Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions: val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.id == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) }) That's the error I get: Exception in thread main org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) at org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) ... 21 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) ... 26 more Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
Re: Random Shuffling
Hi Max, you can always shuffle your elements using the rebalance method. What Flink here does is to distribute the elements of each partition among all available TaskManagers. This happens in a round-robin fashion and is thus not completely random. A different mean is the partitionCustom method which allows you to specify for each element to which partition it shall be sent. You would have to specify a Partitioner to do this. For the splitting there is at moment no syntactic sugar. What you can do, though, is to assign each item a split ID and then use a filter operation to filter the individual splits. Depending on you split ID distribution you will have differently sized splits. Cheers, Till On Mon, Jun 15, 2015 at 1:50 PM Maximilian Alber alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com wrote: Hi Flinksters, I would like to shuffle my elements in the data set and then split it in two according to some ratio. Each element in the data set has an unique id. Is there a nice way to do it with the flink api? (It would be nice to have guaranteed random shuffling.) Thanks! Cheers, Max
Re: Monitoring memory usage of a Flink Job
Ok great, I will try this out and get back to you. Thanks =) On Mon, Jun 15, 2015 at 2:52 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Tamara, you can instruct Flink to write the current memory statistics to the log by setting taskmanager.debug.memory.startLogThread: true in the Flink configuration. Furthermore, you can control the logging interval with taskmanager.debug.memory.logIntervalMs where the interval is specified in milli seconds. Cheers, Till On Mon, Jun 15, 2015 at 2:46 PM Fabian Hueske fhue...@gmail.com http://mailto:fhue...@gmail.com wrote: Hi Tamara, what kind of information do you need? Something like, size and usage of in-memory sort buffers or hash tables? Some information might written in DEBUG logs, but I'm not sure about that. Besides logs, I doubt that Flink monitors memory usage. Cheers, Fabian 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com: Hi, I am running some experiments on Flink and was wondering if there is some way to monitor the memory usage of a Flink Job (running locally and on a cluster). I need to run multiple jobs and compare their memory usage. Cheers, Tamara -- Tamara Mendt
Monitoring memory usage of a Flink Job
Hi, I am running some experiments on Flink and was wondering if there is some way to monitor the memory usage of a Flink Job (running locally and on a cluster). I need to run multiple jobs and compare their memory usage. Cheers, Tamara
Re: Random Shuffling
I think, you need to implement an own Partitioner.java and hand it via DataSet.partitionCustom(partitioner, field) (Just specify any field you like; as you don't want to group by key, it doesn't matter.) When implementing the partitionier, you can ignore the key parameter and compute the output channel randomly. This is kind of a work-around, but it should work. -Matthias On 06/15/2015 01:49 PM, Maximilian Alber wrote: Hi Flinksters, I would like to shuffle my elements in the data set and then split it in two according to some ratio. Each element in the data set has an unique id. Is there a nice way to do it with the flink api? (It would be nice to have guaranteed random shuffling.) Thanks! Cheers, Max signature.asc Description: OpenPGP digital signature
Re: Monitoring memory usage of a Flink Job
Hi Tamara, what kind of information do you need? Something like, size and usage of in-memory sort buffers or hash tables? Some information might written in DEBUG logs, but I'm not sure about that. Besides logs, I doubt that Flink monitors memory usage. Cheers, Fabian 2015-06-15 14:34 GMT+02:00 Tamara Mendt tammyme...@gmail.com: Hi, I am running some experiments on Flink and was wondering if there is some way to monitor the memory usage of a Flink Job (running locally and on a cluster). I need to run multiple jobs and compare their memory usage. Cheers, Tamara
Re: Random Selection
Hi everyone! Thanks! It seems the variable that makes the problems. Making an inner class solved the issue. Cheers, Max On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian sebastian.kr...@hpi.de wrote: Hi everyone, I did not reenact it, but I think the problem here is rather the anonymous class. It looks like it is created within a class, not an object. Thus it is not “static” in Java terms, which means that also its surrounding class (the job class) will be serialized. And in this job class, there seems to be a DataSet field, that cannot be serialized. If that really is the problem, you should either define your anonymous class within the companion object of your job class or resort directly to a function (and make sure that you do not pass a variable from your job class into the scope of the function). Cheers, Sebastian *From:* Till Rohrmann [mailto:trohrm...@apache.org] *Sent:* Montag, 15. Juni 2015 14:16 *To:* user@flink.apache.org *Subject:* Re: Random Selection Hi Max, the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers. Cheers, Till On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com wrote: Hi Flinksters, I would like to randomly choose a element of my data set. But somehow I cannot use scala.util inside my filter functions: val sample_x = X filter(new RichFilterFunction[Vector](){ var i: Int = -1 override def open(config: Configuration) = { i = scala.util.Random.nextInt(N) } def filter(a: Vector) = a.id == i }) val sample_y = Y filter(new RichFilterFunction[Vector](){ def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) }) That's the error I get: Exception in thread main org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a nephele JobGraph: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at Test$delayedInit$body.apply(test.scala:304) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.App$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:45) at Test.main(test.scala) Caused by: org.apache.flink.optimizer.CompilerException: Error translating node 'Filter Filter at Test$anonfun$10.apply(test.scala:276) : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at
Re: Random Shuffling
Thanks! Ok, so for a random shuffle I need partitionCustom. But in that case the data might be out of balance then? For the splitting. Is there no way to have exact sizes? Cheers, Max On Mon, Jun 15, 2015 at 2:26 PM, Till Rohrmann trohrm...@apache.org wrote: Hi Max, you can always shuffle your elements using the rebalance method. What Flink here does is to distribute the elements of each partition among all available TaskManagers. This happens in a round-robin fashion and is thus not completely random. A different mean is the partitionCustom method which allows you to specify for each element to which partition it shall be sent. You would have to specify a Partitioner to do this. For the splitting there is at moment no syntactic sugar. What you can do, though, is to assign each item a split ID and then use a filter operation to filter the individual splits. Depending on you split ID distribution you will have differently sized splits. Cheers, Till On Mon, Jun 15, 2015 at 1:50 PM Maximilian Alber alber.maximil...@gmail.com http://mailto:alber.maximil...@gmail.com wrote: Hi Flinksters, I would like to shuffle my elements in the data set and then split it in two according to some ratio. Each element in the data set has an unique id. Is there a nice way to do it with the flink api? (It would be nice to have guaranteed random shuffling.) Thanks! Cheers, Max
Re: Help with Flink experimental Table API
Hi, Can I work on the issue with TupleSerializer or is someone working on it? On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek aljos...@apache.org wrote: Hi, the reason why this doesn't work is that the TupleSerializer cannot deal with null values: @Test def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val table = env.fromElements[(Integer, String)]( (123, a), (234, b), (345, c), (null, d)).toTable val total = table.select('_1.sum).collect().head.productElement(0) assertEquals(total, 702) } it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me. Cheers, Aljoscha On Sun, 14 Jun 2015 at 20:28 Shiti Saxena ssaxena@gmail.com wrote: Hi, Re-writing the test in the following manner works. But I am not sure if this is the correct way. def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements[(Integer, String)]((123, a), (234, b), (345, c), (0, d)) implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq(id, name)) val rowDataSet = dataSet.map { entry = val row = new Row(2) val amount = if(entry._1100) null else entry._1 row.setField(0, amount) row.setField(1, entry._2) row } val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0) assertEquals(total, 702) } On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena ssaxena@gmail.com wrote: Hi, For val table = env.fromElements[(Integer, String)]((123, a), (234, b), (345, c), (null, d)).toTable I get the following error, Error translating node 'Data Source at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at
Re: No space left on device IOException when using Cross operator
Cross is a quadratic operation. As such, it produces very large results on moderate inputs, which can easily exceed memory and disk space, if the subsequent operation requires to gather all data (such as for the sort in your case). If you use on both inputs 10 MB of 100 byte elements (100K elements per input), you end up with 10 billion elements after the cross, which is 1 TB in size (assuming the result elements are also 100 bytes). This is an inherent issue of using a quadratic operation with data that is to large to be handled by a quadratic operation. Not much anyone can do about this. Try and see if you can replace the Cross operation by something else (Join, CoGroup) or whether you can at least filter aggressively after the Cross before the next operation. On Mon, Jun 15, 2015 at 2:18 PM, Mihail Vieru vi...@informatik.hu-berlin.de wrote: Hi, I get the following *No space left on device IOException* when using the following Cross operator. The inputs for the operator are each just *10MB* in size (same input for IN1 and IN2; 1000 tuples) and I get the exception after Flink manages to fill *50GB* of SSD space and the partition becomes full. I have found a similar problem in the mailing list here: https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E As I currently don't have any more free file system space left, specifying other temporary folders for Flink is not an option. Any ideas on what could help? I'm using the latest 0.9-SNAPSHOT and run the job in a local execution environment. Best, Mihail *java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device* *at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)* *at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)* *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)* *at java.lang.Thread.run(Thread.java:745)* *Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device* *at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)* *at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)* *at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)* *at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)* *... 3 more* *Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device* *at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)* *Caused by: java.io.IOException: No space left on device* *at sun.nio.ch.FileDispatcherImpl.write0(Native Method)* *at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)* *at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)* *at sun.nio.ch.IOUtil.write(IOUtil.java:65)* *at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)* *at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)* *at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471) **public static class crossKAPSPFilter implements CrossFunctionVertexInteger, Tuple2Integer[],String, VertexInteger, Tuple2Integer[],String, * *Tuple2Integer,String {* *@Override* *public Tuple2Integer, String cross(* *VertexInteger, Tuple2Integer[], String vertex1,* *VertexInteger, Tuple2Integer[], String vertex2) throws Exception {* *int vertexIdFirst = vertex1.f0;* *int vertexIdSecond = vertex2.f0;* *Integer[] vertexDistanceVectorFirst = vertex1.f1.f0;* *Integer[] vertexDistanceVectorSecond = vertex2.f1.f0;* *if( **vertexIdFirst != vertexIdSecond* * vertexDistanceVectorFirst[vertexIdSecond] = grapDistThreshold* * vertexDistanceVectorSecond[vertexIdFirst] = grapDistThreshold **) {* *return new Tuple2Integer, String(vertex1.f0, vertex1.f1.f1);* *}* *else return null;* *}* *}*
Re: Help with Flink experimental Table API
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. On Tue, 16 Jun 2015 at 06:25 Shiti Saxena ssaxena@gmail.com wrote: Hi, Can I work on the issue with TupleSerializer or is someone working on it? On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek aljos...@apache.org wrote: Hi, the reason why this doesn't work is that the TupleSerializer cannot deal with null values: @Test def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val table = env.fromElements[(Integer, String)]( (123, a), (234, b), (345, c), (null, d)).toTable val total = table.select('_1.sum).collect().head.productElement(0) assertEquals(total, 702) } it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me. Cheers, Aljoscha On Sun, 14 Jun 2015 at 20:28 Shiti Saxena ssaxena@gmail.com wrote: Hi, Re-writing the test in the following manner works. But I am not sure if this is the correct way. def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements[(Integer, String)]((123, a), (234, b), (345, c), (0, d)) implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq(id, name)) val rowDataSet = dataSet.map { entry = val row = new Row(2) val amount = if(entry._1100) null else entry._1 row.setField(0, amount) row.setField(1, entry._2) row } val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0) assertEquals(total, 702) } On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena ssaxena@gmail.com wrote: Hi, For val table = env.fromElements[(Integer, String)]((123, a), (234, b), (345, c), (null, d)).toTable I get the following error, Error translating node 'Data Source at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat) : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at
No space left on device IOException when using Cross operator
Hi, I get the following *No space left on device IOException* when using the following Cross operator. The inputs for the operator are each just *10MB* in size (same input for IN1 and IN2; 1000 tuples) and I get the exception after Flink manages to fill *50GB* of SSD space and the partition becomes full. I have found a similar problem in the mailing list here: https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E As I currently don't have any more free file system space left, specifying other temporary folders for Flink is not an option. Any ideas on what could help? I'm using the latest 0.9-SNAPSHOT and run the job in a local execution environment. Best, Mihail /java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device// //at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)// //at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)// //at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)// //at java.lang.Thread.run(Thread.java:745)// //Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device// //at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)// //at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)// //at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)// //at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)// //... 3 more// //Caused by: *java.io.IOException*: Thread 'SortMerger spilling thread' terminated due to an exception: *No space left on device*// //at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)// //Caused by: java.io.IOException: No space left on device// //at sun.nio.ch.FileDispatcherImpl.write0(Native Method)// //at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)// //at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)// //at sun.nio.ch.IOUtil.write(IOUtil.java:65)// //at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)// //at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)// //at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471) //public static class crossKAPSPFilter implements CrossFunctionVertexInteger, Tuple2Integer[],String, VertexInteger, Tuple2Integer[],String, // //Tuple2Integer,String {// // //@Override// //public Tuple2Integer, String cross(// //VertexInteger, Tuple2Integer[], String vertex1,// //VertexInteger, Tuple2Integer[], String vertex2) throws Exception {// //int vertexIdFirst = vertex1.f0;// //int vertexIdSecond = vertex2.f0;// //Integer[] vertexDistanceVectorFirst = vertex1.f1.f0;// //Integer[] vertexDistanceVectorSecond = vertex2.f1.f0;// //if( //vertexIdFirst != vertexIdSecond// // vertexDistanceVectorFirst[vertexIdSecond] = grapDistThreshold// // vertexDistanceVectorSecond[vertexIdFirst] = grapDistThreshold //) {// //return new Tuple2Integer, String(vertex1.f0, vertex1.f1.f1);// //}// //else return null;// //}// //}/
RE: Load balancing
Hi Gianmarco, The processing time is quadratic in the size of the single elements. I was already applying that strategy that you also proposed, but tried to find out if there is a way of balancing the subitems of these large items over the workers without shuffling the whole dataset. However, I noticed the PKG strategy and maybe it will come in handy in some other place :) So, thanks again for the pointers! Cheers, Sebastian From: Gianmarco De Francisci Morales [mailto:g...@apache.org] Sent: Freitag, 12. Juni 2015 19:02 To: user@flink.apache.org Subject: Re: Load balancing Hi Sebastian, Maybe I misunderstood your problem. Is the processing time quadratic in the size of the single element of the dataset? Or is it quadratic in the number of elements of the dataset with a single key? That is, is the element heavy or is it the key heavy? In the second case you can use PKG. In the first case, I don't think you really need any system level help. Given that you can split up the work for each element, you can just transform the dataset so that a single heavy element is replaced by a set of generated sub-elements, with the ID of the original element as the key. Then you can process the subelements in parallel, and finally group by key to aggregate the result. Cheers, -- Gianmarco On 11 June 2015 at 19:16, Kruse, Sebastian sebastian.kr...@hpi.demailto:sebastian.kr...@hpi.de wrote: Hi Gianmarco, Thanks for the pointer! I had a quick look at the paper, but unfortunately I don’t see a connection to my problem. I have a batch job and elements in my dataset, that need quadratic much processing time depending on their size. The largest ones, that cause higher-than-average load, shall be split up and the splits shall be distributed among the workers. Your paper says “In principle, depending on the application, two different messages might impose a different load on workers. However, in most cases these differences even out and modeling such application-specific differences is not necessary.” Maybe, I am missing something, but doesn’t this assumption render PKG inapplicable to my case? Objections to that are of course welcome :) Cheers, Sebastian From: Gianmarco De Francisci Morales [mailto:g...@apache.orgmailto:g...@apache.org] Sent: Mittwoch, 10. Juni 2015 15:40 To: user@flink.apache.orgmailto:user@flink.apache.org Subject: Re: Load balancing We have been working on an adaptive load balancing strategy that would address exactly the issue you point out. FLINK-1725 is the starting point for the integration. Cheers, -- Gianmarco On 9 June 2015 at 20:31, Fabian Hueske fhue...@gmail.commailto:fhue...@gmail.com wrote: Hi Sebastian, I agree, shuffling only specific elements would be a very useful feature, but unfortunately it's not supported (yet). Would you like to open a JIRA for that? Cheers, Fabian 2015-06-09 17:22 GMT+02:00 Kruse, Sebastian sebastian.kr...@hpi.demailto:sebastian.kr...@hpi.de: Hi folks, I would like to do some load balancing within one of my Flink jobs to achieve good scalability. The rebalance() method is not applicable in my case, as the runtime is dominated by the processing of very few larger elements in my dataset. Hence, I need to distribute the processing work for these elements among the nodes in the cluster. To do so, I subdivide those elements into partial tasks and want to distribute these partial tasks to other nodes by employing a custom partitioner. Now, my question is the following: Actually, I do not need to shuffle the complete dataset but only a few elements. So is there a way of telling within the partitioner, that data should reside on the same task manager? Thanks! Cheers, Sebastian