On Tue, Dec 24, 2013 at 7:29 AM, Ameet Kini <[email protected]> wrote:

>
> If Java serialization is the only one that properly works for closures,
> then I shouldn't be setting "spark.closure.serializer" to
> "org.apache.spark.serializer.KryoSerializer",
>

My understanding is that it's not that it kryo wouldn't necessarily work
for closures, it's just that closure serialization is used not only for
user defined closures but also for a lot of closures internal to Spark. For
them, of course, there's no way to enable kryo support.

Data objects (and their serializer) on the other hand are not defined
anywhere but the user's code. Therefore overriding object serializer is in
general a safe assumption and overriding closure serializer is not.


> and my only hope for getting lookup (and other such methods that still use
> closure serializers) to work is to either a) use only Java serialization
> for my objects, or b) have my objects implement Serializable but enable
> Kryo as well for object serialization.
>

or c) avoid using Spark api that currently uses closures to communicate
data objects between front end and backend for kryo-only objects (as i do).
The most annoying of those is fold().

In fact, d) you can always wrap a kryo object into byte array in front end,
pass in java-serializable byte array thru a closure, and unwrap it in
backend. This technique is extremely ugly though with methods like fold()
which force you to use the same object type in front end and backend
operation (and obviously it may be not the type you want for the purposes
of elegant folding). I had this problem extensively with 3rd party types
for which i have no desire to add java serialization (mostly, hadoop
Writables of various kind) so the obvious desirable solution is to add kryo
support for them without having to modify the original class. This mostly
works for me.


> For option b, I'd be setting "spark.serializer" to
> "org.apache.spark.serializer.KryoSerializer" but leave
> "spark.closure.serializer" to its default value (Java).
>
Yes. like i said changing closure serializer is undesirable unless you can
guarantee proper serialization support for all Spark internal closures as
well your own closures.


>
> If the above is true, then seems like as it stands today, the best
> practice is for objects that use Kryo to also either implement Serializable
> or Externalizable for closures to work properly.
>

Again, this is only true for a smaller portion of spark api.  Most of spark
api doesn't have this problem so you may well get away with either not
using them or pre-serialize objects into byte arrays while using
"problematic" api.


> Thanks,
> Ameet
>
>
>
>
> On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <[email protected]>wrote:
>
>> The problem really is that in certain cases task results -- and
>> front-end-passed parameters -- are passed thru closures. For closures, only
>> java serializer is properly supported (afaik).
>>
>> there has been a limited number of fixes for data parameter communication
>> between front end and backend for using other-than-java serialization (e.g.
>> for parallelize(), collect()-- these methods do not use closures to pass
>> in/grab data objects anymore); however, a certain number of methods is
>> still using closures to pass in a data object.
>>
>> afaik the methods doing correct front/back end parameter serialization
>> are:
>>
>> collect()
>> take() (maybe)
>> parallelize()
>> reduce()
>>
>> Everything else ("fold()", etc.) that communicates data between front end
>> and backend, still wraps data into closures. For a thing like fold() in
>> fact you'd have to use type that has both Java and Kryo support at the same
>> time, because it will always use both closure and object serializers while
>> executing.
>>
>> This IMO is inconsistent of course with assumption that same data type
>> should be supported uniformly regardless of where it serializes, but that's
>> the state of things as it stands.
>>
>>
>>
>> On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <[email protected]> wrote:
>>
>>> Thanks Imran.
>>>
>>> I tried setting "spark.closure.serializer" to
>>> "org.apache.spark.serializer.KryoSerializer" and now end up seeing
>>> NullPointerException when the executor starts up. This is a snippet of the
>>> executor's log. Notice how "registered TileIdWritable" and "registered
>>> ArgWritable" is called, so I see that my KryoRegistrator is being called.
>>> However, it's not clear why there's a follow-on NPE. My spark log level is
>>> set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if
>>> there s
>>> some other way to get the executor to be more verbose as to the cause of
>>> the NPE.
>>>
>>> When I take out the spark.closure.serializer setting (i.e., go back to
>>> the default Java serialization), the executors start up fine, and executes
>>> other RDD actions, but of course not the lookup action (my original
>>> problem). With the spark.closure.serializer setting to kryo, it dies with
>>> an NPE during executor startup.
>>>
>>>
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver:
>>> akka.tcp://[redacted]:48147/user/StandaloneScheduler<http://[email protected]:48147/user/StandaloneScheduler>
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully
>>> registered with driver
>>> 13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
>>> 13/12/23 11:00:36 INFO Remoting: Starting remoting
>>> 13/12/23 11:00:36 INFO Remoting: Remoting started; listening on
>>> addresses 
>>> :[akka.tcp:/[redacted]:56483<http://[email protected]:56483/>
>>> ]
>>> 13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses:
>>> [akka.tcp://[redacted]:56483 <http://[email protected]:56483/>]
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster:
>>> akka.tcp://[redacted]:48147/user/BlockManagerMaster<http://[email protected]:48147/user/BlockManagerMaster>
>>> 13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity
>>> 323.9 MB.
>>> 13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root
>>> dirs '/tmp'
>>> 13/12/23 11:00:36 INFO DiskStore: Created local directory at
>>> /tmp/spark-local-20131223110036-4335
>>> 13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617
>>> with id = ConnectionManagerId([redacted],41617)
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
>>> 13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker:
>>> akka.tcp:/[redacted]:48147/user/MapOutputTracker<http://[email protected]:48147/user/MapOutputTracker>
>>> 13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is
>>> /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
>>> 13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> 13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator:
>>> geotrellis.spark.KryoRegistrator
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered TileIdWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> registered ArgWritable
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 2
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 1
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 3
>>> 13/12/23 11:00:37 INFO Executor: Running task ID 0
>>> 13/12/23 11:00:37 INFO Executor: Fetching
>>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with
>>> timestamp 1387814434436
>>> 13/12/23 11:00:37 INFO Utils: Fetching
>>> http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to
>>> /tmp/fetchFileTemp2456419097284083628.tmp
>>> 13/12/23 11:00:37 INFO Executor: Adding
>>> file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar
>>> to class loader
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-4,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-2,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>        at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-1,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread
>>> Thread[pool-7-thread-3,5,main]
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
>>>         at scala.Option.flatMap(Option.scala:170)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> 13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called
>>>
>>> Thanks,
>>> Ameet
>>>
>>>
>>> On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[email protected]>wrote:
>>>
>>>> there is a separate setting for serializing closures
>>>> "spark.closure.serializer" (listed here
>>>> http://spark.incubator.apache.org/docs/latest/configuration.html)
>>>>
>>>> that is used to serialize whatever is used by all the fucntions on an
>>>> RDD, eg., map, filter, and lookup.  Those closures include referenced
>>>> variables, like your
>>>> TileIdWritable.
>>>>
>>>> So you need to either change that to use kryo, or make your object
>>>> serializable to java.
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[email protected]>wrote:
>>>>
>>>>>
>>>>> I'm getting the below NotSerializableException despite using Kryo to
>>>>> serialize that class (TileIdWritable).
>>>>>
>>>>> The offending line: awtestRdd.lookup(TileIdWritable(200))
>>>>>
>>>>> Initially I thought Kryo is not being registered properly, so I tried
>>>>> running operations over awtestRDD which force a shuffle (e.g., 
>>>>> groupByKey),
>>>>> and that seemed to work fine. So it seems to be specific to the
>>>>> "TileIdWritable(200)" argument to lookup().  Is there anything unique 
>>>>> about
>>>>> companion objects and Kryo serialization? I even replaced
>>>>> "TileIdWritable(200)" by "new TileIdWritable" but still see that exception
>>>>>
>>>>>
>>>>> class TileIdWritable {
>>>>>  //
>>>>> }
>>>>>
>>>>> object TileIdWritable {
>>>>>  def apply(value: Long) = new TileIdWritable
>>>>> }
>>>>>
>>>>>
>>>>> My Kryo registrator:
>>>>> class KryoRegistrator extends SparkKryoRegistrator {
>>>>>     override def registerClasses(kryo: Kryo) {
>>>>>       println("Called KryoRegistrator")  // I see this printed during
>>>>> shuffle operations
>>>>>       val r = kryo.register(classOf[TileIdWritable])
>>>>>       val s = kryo.register(classOf[ArgWritable])
>>>>>     }
>>>>> }
>>>>>
>>>>> Then just before creating a Spark Context, I have these two lines:
>>>>>     System.setProperty("spark.serializer",
>>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>>     System.setProperty("spark.kryo.registrator",
>>>>> "geotrellis.spark.KryoRegistrator")
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> The exception itself:
>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>> failed: Task not serializable: java.io.NotSerializableException:
>>>>> geotrellis.spark.formats.TileIdWritable
>>>>>     - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4",
>>>>> name: "key$1", type: "class java.lang.Object")
>>>>>     - object (class
>>>>> "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
>>>>>     - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4",
>>>>> name: "func$1", type: "interface scala.Function1")
>>>>>     - root object (class
>>>>> "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
>>>>>     at
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>     at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>     at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>
>>>>> Regards,
>>>>> Ameet
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to