Re: Bug in Accumulators...

2014-11-23 Thread Aaron Davidson
As Mohit said, making Main extend Serializable should fix this example. In
general, it's not a bad idea to mark the fields you don't want to serialize
(e.g., sc and conf in this case) as @transient as well, though this is not
the issue in this case.

Note that this problem would not have arisen in your very specific example
if you used a while loop instead of a for-each loop, but that's really more
of a happy coincidence than something you should rely on, as nested lambdas
are virtually unavoidable in Scala.

On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 perhaps the closure ends up including the main object which is not
 defined as serializable...try making it a case object or object main
 extends Serializable.


 On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote:

 I posted several examples in java at http://lordjoesoftware.blogspot.com/

 Generally code like this works and I show how to accumulate more complex
 values.

 // Make two accumulators using Statistics
  final AccumulatorInteger totalLetters= ctx.accumulator(0L,
 ttl);
  JavaRDDstring lines = ...

 JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
 String() {
 @Override
 public Iterablestring call(final String s) throws Exception
 {
 // Handle accumulator here
 totalLetters.add(s.length()); // count all letters
 
  });
 
  Long numberCalls = totalCounts.value();

 I believe the mistake is to pass the accumulator to the function rather
 than
 letting the function find the accumulator - I do this in this case by
 using
 a final local variable



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Bug in Accumulators...

2014-11-23 Thread Sean Owen
Here, the Main object is not meant to be serialized. transient ought
to be for fields that are within an object that is legitimately
supposed to be serialized, but, whose value can be recreated on
deserialization. I feel like marking objects that aren't logically
Serializable as such is a hack, and transient extend that hack, and
will cause surprises later.

Hack away for toy examples but ideally the closure cleaner would snip
whatever phantom reference is at work here. I usually try to rewrite
the Scala as you say to avoid the issue rather than make things
Serializable ad hoc.

On Sun, Nov 23, 2014 at 10:49 AM, Aaron Davidson ilike...@gmail.com wrote:
 As Mohit said, making Main extend Serializable should fix this example. In
 general, it's not a bad idea to mark the fields you don't want to serialize
 (e.g., sc and conf in this case) as @transient as well, though this is not
 the issue in this case.

 Note that this problem would not have arisen in your very specific example
 if you used a while loop instead of a for-each loop, but that's really more
 of a happy coincidence than something you should rely on, as nested lambdas
 are virtually unavoidable in Scala.

 On Sat, Nov 22, 2014 at 5:16 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 perhaps the closure ends up including the main object which is not
 defined as serializable...try making it a case object or object main
 extends Serializable.


 On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote:

 I posted several examples in java at http://lordjoesoftware.blogspot.com/

 Generally code like this works and I show how to accumulate more complex
 values.

 // Make two accumulators using Statistics
  final AccumulatorInteger totalLetters= ctx.accumulator(0L,
 ttl);
  JavaRDDstring lines = ...

 JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
 String() {
 @Override
 public Iterablestring call(final String s) throws Exception
 {
 // Handle accumulator here
 totalLetters.add(s.length()); // count all letters
 
  });
 
  Long numberCalls = totalCounts.value();

 I believe the mistake is to pass the accumulator to the function rather
 than
 letting the function find the accumulator - I do this in this case by
 using
 a final local variable



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-11-22 Thread octavian.ganea
One month later, the same problem. I think that someone (e.g. inventors of
Spark) should show us a big example of how to use accumulators. I can start
telling that we need to see an example of the following form:

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).map(x = foo(x,accum))

Passing accum as a parameter to function foo will require it to be
serializable, but, a.f.a.i.k  any accumulator incapsulates the spark context
sc which is not serializable and which lead a
java.io.NotSerializableException: SparkContext  exception. 

I am really curious to see a real application that uses accumulators.
Otherwise, you have to change their code such that the above issue does not
appear anymore.

Best,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-11-22 Thread Sean Owen
That seems to work fine. Add to your example

def foo(i: Int, a: Accumulator[Int]) = a += i

and add an action at the end to get the expression to evaluate:

sc.parallelize(Array(1, 2, 3, 4)).map(x = foo(x,accum)).foreach(println)

and it works, and you have accum with value 10 at the end.

The similar example at
http://spark.apache.org/docs/latest/programming-guide.html#accumulators
also works.

You say AFAIK -- are you actually able to reproduce this?

On Sat, Nov 22, 2014 at 7:01 PM, octavian.ganea
octavian.ga...@inf.ethz.ch wrote:
 One month later, the same problem. I think that someone (e.g. inventors of
 Spark) should show us a big example of how to use accumulators. I can start
 telling that we need to see an example of the following form:

 val accum = sc.accumulator(0)
 sc.parallelize(Array(1, 2, 3, 4)).map(x = foo(x,accum))

 Passing accum as a parameter to function foo will require it to be
 serializable, but, a.f.a.i.k  any accumulator incapsulates the spark context
 sc which is not serializable and which lead a
 java.io.NotSerializableException: SparkContext  exception.

 I am really curious to see a real application that uses accumulators.
 Otherwise, you have to change their code such that the above issue does not
 appear anymore.

 Best,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19567.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-11-22 Thread octavian.ganea
Hi Sowen,

You're right, that example works, but look what example does not work for
me:

object Main  {  
  def main(args: Array[String]) { 
val conf = new SparkConf().setAppName(name) 
val sc = new SparkContext(conf) 
val accum = sc.accumulator(0) 
for (i - 1 to 10) { 
  val y = sc.parallelize(Array(1, 2, 3, 4)).mapPartitions(x =
foo(x,accum)).reduce(_ + _)
} 
println(Result :  + accum.value)
sc.stop 
  } 
  
  def foo(i: Iterator[Int], a: Accumulator[Int]) : Iterator[Int] = {
while (i.hasNext)  a += i.next
List().iterator
  }
}  

This gives (I run it on a cluster with 16 nodes) the error:

Exception in thread main org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at EL_LBP_Spark$$anonfun$main$1.apply$mcVI$sp(EL_LBP_Spark.scala:16)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at EL_LBP_Spark$.main(EL_LBP_Spark.scala:15)
at EL_LBP_Spark.main(EL_LBP_Spark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more


Seems that there is a problem with mapPartitions ...

Thanks for your suggestion,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-11-22 Thread lordjoe
I posted several examples in java at http://lordjoesoftware.blogspot.com/

Generally code like this works and I show how to accumulate more complex
values.

// Make two accumulators using Statistics
 final AccumulatorInteger totalLetters= ctx.accumulator(0L,
ttl);
 JavaRDDstring lines = ...

JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
String() {
@Override
public Iterablestring call(final String s) throws Exception {
// Handle accumulator here
totalLetters.add(s.length()); // count all letters

 });

 Long numberCalls = totalCounts.value();

I believe the mistake is to pass the accumulator to the function rather than
letting the function find the accumulator - I do this in this case by using
a final local variable



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-11-22 Thread Mohit Jaggi
perhaps the closure ends up including the main object which is not
defined as serializable...try making it a case object or object main
extends Serializable.

On Sat, Nov 22, 2014 at 4:16 PM, lordjoe lordjoe2...@gmail.com wrote:

 I posted several examples in java at http://lordjoesoftware.blogspot.com/

 Generally code like this works and I show how to accumulate more complex
 values.

 // Make two accumulators using Statistics
  final AccumulatorInteger totalLetters= ctx.accumulator(0L,
 ttl);
  JavaRDDstring lines = ...

 JavaRDDstring words = lines.flatMap(new FlatMapFunctionString,
 String() {
 @Override
 public Iterablestring call(final String s) throws Exception {
 // Handle accumulator here
 totalLetters.add(s.length()); // count all letters
 
  });
 
  Long numberCalls = totalCounts.value();

 I believe the mistake is to pass the accumulator to the function rather
 than
 letting the function find the accumulator - I do this in this case by using
 a final local variable



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Bug in Accumulators...

2014-11-19 Thread Jake Mannix
I'm running into similar problems with accumulators failing to serialize
properly.  Are there any examples of accumulators being used in more
complex environments than simply initializing them in the same class and
then using them in a .foreach() on an RDD referenced a few lines below?

From the above looking error, it looks like any scala complexity at all
which is added causes the closure cleaner to freak out with accumulators...

On Fri, Nov 7, 2014 at 12:12 AM, Aaron Davidson ilike...@gmail.com wrote:

 This may be due in part to Scala allocating an anonymous inner class in
 order to execute the for loop. I would expect if you change it to a while
 loop like

 var i = 0
 while (i  10) {
   sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
   i += 1
 }

 then the problem may go away. I am not super familiar with the closure
 cleaner, but I believe that we cannot prune beyond 1 layer of references,
 so the extra class of nesting may be screwing something up. If this is the
 case, then I would also expect replacing the accumulator with any other
 reference to the enclosing scope (such as a broadcast variable) would have
 the same result.

 On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you provide all pieces of codes which can reproduce the bug? Here
 is my test code:

 import org.apache.spark._
 import org.apache.spark.SparkContext._

 object SimpleApp {

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(SimpleApp)
 val sc = new SparkContext(conf)

 val accum = sc.accumulator(0)
 for (i - 1 to 10) {
   sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
 }
 sc.stop()
   }
 }

 It works fine both in client and cluster. Since this is a serialization
 bug, the outer class does matter. Could you provide it? Is there
 a SparkContext field in the outer class?

 Best Regards,
 Shixiong Zhu

 2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch:

 I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
 if I
 run it in local mode! )

 If I put the accumulator inside the for loop, everything will work fine.
 I
 guess the bug is that an accumulator can be applied to JUST one RDD.

 Still another undocumented 'feature' of Spark that no one from the people
 who maintain Spark is willing to solve or at least to tell us about ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-- 

  -jake


Re: Bug in Accumulators...

2014-11-07 Thread Shixiong Zhu
Could you provide all pieces of codes which can reproduce the bug? Here is
my test code:

import org.apache.spark._
import org.apache.spark.SparkContext._

object SimpleApp {

  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(SimpleApp)
val sc = new SparkContext(conf)

val accum = sc.accumulator(0)
for (i - 1 to 10) {
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
}
sc.stop()
  }
}

It works fine both in client and cluster. Since this is a serialization
bug, the outer class does matter. Could you provide it? Is there
a SparkContext field in the outer class?

Best Regards,
Shixiong Zhu

2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch:

 I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
 if I
 run it in local mode! )

 If I put the accumulator inside the for loop, everything will work fine. I
 guess the bug is that an accumulator can be applied to JUST one RDD.

 Still another undocumented 'feature' of Spark that no one from the people
 who maintain Spark is willing to solve or at least to tell us about ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Bug in Accumulators...

2014-11-07 Thread Aaron Davidson
This may be due in part to Scala allocating an anonymous inner class in
order to execute the for loop. I would expect if you change it to a while
loop like

var i = 0
while (i  10) {
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
  i += 1
}

then the problem may go away. I am not super familiar with the closure
cleaner, but I believe that we cannot prune beyond 1 layer of references,
so the extra class of nesting may be screwing something up. If this is the
case, then I would also expect replacing the accumulator with any other
reference to the enclosing scope (such as a broadcast variable) would have
the same result.

On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you provide all pieces of codes which can reproduce the bug? Here is
 my test code:

 import org.apache.spark._
 import org.apache.spark.SparkContext._

 object SimpleApp {

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(SimpleApp)
 val sc = new SparkContext(conf)

 val accum = sc.accumulator(0)
 for (i - 1 to 10) {
   sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
 }
 sc.stop()
   }
 }

 It works fine both in client and cluster. Since this is a serialization
 bug, the outer class does matter. Could you provide it? Is there
 a SparkContext field in the outer class?

 Best Regards,
 Shixiong Zhu

 2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch:

 I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works
 if I
 run it in local mode! )

 If I put the accumulator inside the for loop, everything will work fine. I
 guess the bug is that an accumulator can be applied to JUST one RDD.

 Still another undocumented 'feature' of Spark that no one from the people
 who maintain Spark is willing to solve or at least to tell us about ...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Bug in Accumulators...

2014-10-27 Thread octavian.ganea
I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works if I
run it in local mode! ) 

If I put the accumulator inside the for loop, everything will work fine. I
guess the bug is that an accumulator can be applied to JUST one RDD. 

Still another undocumented 'feature' of Spark that no one from the people
who maintain Spark is willing to solve or at least to tell us about ...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-10-26 Thread octavian.ganea
Sorry, I forgot to say that this gives the above error just when run on a
cluster, not in local mode.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17277.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug in Accumulators...

2014-10-25 Thread octavian.ganea
There is for sure a bug in the Accumulators code. 

More specifically, the following code works well as expected:

  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(EL LBP SPARK)
val sc = new SparkContext(conf)
val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
sc.stop
  }

but the following code (adding just a for loop) gives the weird error :
  def run(args: Array[String]) {
val conf = new SparkConf().setAppName(EL LBP SPARK)
val sc = new SparkContext(conf)
val accum = sc.accumulator(0)
for (i - 1 to 10) {
  sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
}
sc.stop
  }


the error:
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task not serializable: java.io.NotSerializableException:
org.apache.spark.SparkContext
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Can someone confirm this bug ?

Related to this:
http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-td17262.html

http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-using-Accumulators-on-cluster-td17261.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug in Accumulators...

2014-10-25 Thread Rishi Yadav
works fine. Spark 1.1.0 on REPL
On Sat, Oct 25, 2014 at 1:41 PM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 There is for sure a bug in the Accumulators code.

 More specifically, the following code works well as expected:

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(EL LBP SPARK)
 val sc = new SparkContext(conf)
 val accum = sc.accumulator(0)
 sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
 sc.stop
   }

 but the following code (adding just a for loop) gives the weird error :
   def run(args: Array[String]) {
 val conf = new SparkConf().setAppName(EL LBP SPARK)
 val sc = new SparkContext(conf)
 val accum = sc.accumulator(0)
 for (i - 1 to 10) {
   sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x)
 }
 sc.stop
   }


 the error:
 Exception in thread main org.apache.spark.SparkException: Job aborted due
 to stage failure: Task not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
 at

 org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Can someone confirm this bug ?

 Related to this:

 http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-td17262.html


 http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-using-Accumulators-on-cluster-td17261.html



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org