bug with MapPartitions?

2014-10-17 Thread davidkl
Hello,

Maybe there is something I do not get to understand, but I believe this code
should not throw any serialization error when I run this in the spark shell.
Using similar code with map instead of mapPartitions works just fine.

import java.io.BufferedInputStream
import java.io.FileInputStream
import com.testing.DataPacket

val inStream = new BufferedInputStream(new FileInputStream(inputFile))
val p = new DataPacket(inStream)
val c = Array(p)
val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
  var res = List[String]()
  while (iter.hasNext)
  {
val cur = iter.next;
res .::= ()
  }
  res.iterator
}
var r = sc.parallelize(c).mapPartitions(myfunc).collect()

This throws the following:

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
...
...
Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
...
...

Why is this code failing? The constructor of DataPacket just reads data, but
does not keep any reference to the BufferedInputStream. Note that this is
not the real code, but a simplification while trying to isolate the cause of
the error I get. Using map on this instead of MapPartitions works fine.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: bug with MapPartitions?

2014-10-17 Thread Akshat Aranya
There seems to be some problem with what gets captured in the closure
that's passed into the mapPartitions (myfunc in your case).

I've had a similar problem before:

http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html

Try putting your myFunc in an object:

object Mapper {
  def myFunc = ...
}
val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect()

On Fri, Oct 17, 2014 at 7:33 AM, davidkl davidkl...@hotmail.com wrote:

 Hello,

 Maybe there is something I do not get to understand, but I believe this
 code
 should not throw any serialization error when I run this in the spark
 shell.
 Using similar code with map instead of mapPartitions works just fine.

 import java.io.BufferedInputStream
 import java.io.FileInputStream
 import com.testing.DataPacket

 val inStream = new BufferedInputStream(new FileInputStream(inputFile))
 val p = new DataPacket(inStream)
 val c = Array(p)
 val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
   var res = List[String]()
   while (iter.hasNext)
   {
 val cur = iter.next;
 res .::= ()
   }
   res.iterator
 }
 var r = sc.parallelize(c).mapPartitions(myfunc).collect()

 This throws the following:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 ...
 ...
 Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 ...
 ...

 Why is this code failing? The constructor of DataPacket just reads data,
 but
 does not keep any reference to the BufferedInputStream. Note that this is
 not the real code, but a simplification while trying to isolate the cause
 of
 the error I get. Using map on this instead of MapPartitions works fine.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org