Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-26 Thread npanj
I have both SPARK-2878 and SPARK-2893. 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-2878-Kryo-serialisation-with-custom-Kryo-registrator-failing-tp7719p8046.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-25 Thread npanj
I am running the code with @rxin's patch in standalone mode.  In my case I am
registering org.apache.spark.graphx.GraphKryoRegistrator . 

Recently I started to see com.esotericsoftware.kryo.KryoException:
java.io.IOException: failed to uncompress the chunk: PARSING_ERROR . Has
anyone seen this? Could it be related to this issue?  Here it trace: 
--
vids (org.apache.spark.graphx.impl.VertexAttributeBlock)
com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
com.esotericsoftware.kryo.io.Input.require(Input.java:169)
com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:710)
com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:127)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:107)
com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
   
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
   
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
   
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
   
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1054)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
   
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
   
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   
org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
   
org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
   
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
   
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   
org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:87)
   
org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:85)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:202)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

--




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-2878-Kryo-serialisation-with-custom-Kryo-registrator-failing-tp7719p7989.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-25 Thread Graham Dennis
Hi,

Unless you manually patched Spark, if you have Reynold’s patch for SPARK-2878, 
you also have the patch for SPARK-2893 which makes the underlying cause much 
more obvious and explicit.  So the below is unlikely to be related to 
SPARK-2878.

Graham

On 26 Aug 2014, at 4:13 am, npanj nitinp...@gmail.com wrote:

 I am running the code with @rxin's patch in standalone mode.  In my case I am
 registering org.apache.spark.graphx.GraphKryoRegistrator . 
 
 Recently I started to see com.esotericsoftware.kryo.KryoException:
 java.io.IOException: failed to uncompress the chunk: PARSING_ERROR . Has
 anyone seen this? Could it be related to this issue?  Here it trace: 
 --
 vids (org.apache.spark.graphx.impl.VertexAttributeBlock)
com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
com.esotericsoftware.kryo.io.Input.require(Input.java:169)
com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:710)
com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:127)
 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:107)
com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
 
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 
 org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1054)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
 
 org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
 org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:87)
 
 org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:85)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:202)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 
 --
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-2878-Kryo-serialisation-with-custom-Kryo-registrator-failing-tp7719p7989.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-19 Thread Debasish Das
@rxin With the fixes, I could run it fine on top of branch-1.0

On master when running using YARN I am getting another KryoException:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 247 in stage 52.0 failed 4 times, most recent
failure: Lost task 247.3 in stage 52.0 (TID 10010,
tblpmidn05adv-hdp.tdc.vzwcorp.com):
com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException

Serialization trace:

shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)


com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)


com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

OutLinkBlock does not extend Serializable...Also I did not see this failure
before..

Was the fix tested on YARN ?

@dbtsai did your assembly on YARN ran fine or you are still noticing these
exceptions ?

Thanks.

Deb


On Thu, Aug 14, 2014 at 5:48 PM, Reynold Xin r...@databricks.com wrote:

 Here: https://github.com/apache/spark/pull/1948



 On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das debasish.da...@gmail.com
 wrote:

 Is there a fix that I can test ? I have the flows setup for both
 standalone and YARN runs...

 Thanks.
 Deb



 On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin r...@databricks.com
 wrote:

 Yes, I understand it might not work for custom serializer, but that is a
 much less common path.

 Basically I want a quick fix for 1.1 release (which is coming up soon).
 I would not be comfortable making big changes to class path late into the
 release cycle. We can do that for 1.2.





 On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis graham.den...@gmail.com
 wrote:

 That should work, but would you also make these changes to the
 JavaSerializer?  The API of these is the same so that you can select one or
 the other (or in theory a custom serializer)?  This also wouldn't address
 the problem of shipping custom *serializers* (not kryo registrators) in
 user jars.

 On 14 August 2014 19:23, Reynold Xin r...@databricks.com wrote:

 Graham,

 SparkEnv only creates a KryoSerializer, but as I understand that
 serializer doesn't actually initializes the registrator since that is only
 called when newKryo() is called when KryoSerializerInstance is 
 initialized.

 Basically I'm thinking a quick fix for 1.2:

 1. Add a classLoader field to KryoSerializer; initialize new
 KryoSerializerInstance with that class loader

  2. Set that classLoader to the executor's class loader when Executor
 is initialized.

 Then all deser calls should be using the executor's class loader.




 On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis 
 graham.den...@gmail.com wrote:

 Hi Reynold,

 That would solve this specific issue, but you'd need to be careful
 that you never created a serialiser instance before the first task is
 received.  Currently in Executor.TaskRunner.run a closure serialiser
 instance is created before any application jars are downloaded, but that
 could be moved.  To me, this seems a little fragile.

 However there is a related issue where you can't ship a custom
 serialiser in an application jar because the serialiser is instantiated
 when the SparkEnv object is created, which is before any tasks are 
 received
 by the executor.  The above approach wouldn't help with this problem.
  Additionally, the YARN scheduler currently uses this approach of adding
 the application jar to the Executor classpath, so it would make things a
 bit more uniform.

 Cheers,
 Graham


 On 14 August 2014 17:37, Reynold Xin r...@databricks.com wrote:

 Graham,

 Thanks for working on this. This is an important bug to fix.

  I don't have the whole context and obviously I haven't spent
 nearly as much time on this as you have, but I'm wondering what if we
 always pass the executor's ClassLoader to the Kryo serializer? Will that
 solve this problem?




 On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis 
 graham.den...@gmail.com wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the
 default).  Theoretically Spark supports custom serialisers, but due to 
 a
 related issue, custom serialisers currently can't live in application 
 jars
 and must be available to all executors at launch.  My PR fixes this 
 issue
 as well, allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com
 wrote:

 Sorry I just saw Graham's email after sending my previous email
 about this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 
 1.0...

 What's the status of this PR ? Will this fix be back-ported to
 1.0.1 as we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are
 the other recommended efficient serializers ?

 Thanks.
 Deb


 On 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-15 Thread Debasish Das
I am still a bit confused that why this issue did not show up in 0.9...at
that time there was no spark-submit and the context was constructed with
low level calls...

Kryo register for ALS was always in my application code..

Was this bug introduced in 1.0 or it was always there ?
 On Aug 14, 2014 5:48 PM, Reynold Xin r...@databricks.com wrote:

 Here: https://github.com/apache/spark/pull/1948



 On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das debasish.da...@gmail.com
 wrote:

 Is there a fix that I can test ? I have the flows setup for both
 standalone and YARN runs...

 Thanks.
 Deb



 On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin r...@databricks.com
 wrote:

 Yes, I understand it might not work for custom serializer, but that is a
 much less common path.

 Basically I want a quick fix for 1.1 release (which is coming up soon).
 I would not be comfortable making big changes to class path late into the
 release cycle. We can do that for 1.2.





 On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis graham.den...@gmail.com
 wrote:

 That should work, but would you also make these changes to the
 JavaSerializer?  The API of these is the same so that you can select one or
 the other (or in theory a custom serializer)?  This also wouldn't address
 the problem of shipping custom *serializers* (not kryo registrators) in
 user jars.

 On 14 August 2014 19:23, Reynold Xin r...@databricks.com wrote:

 Graham,

 SparkEnv only creates a KryoSerializer, but as I understand that
 serializer doesn't actually initializes the registrator since that is only
 called when newKryo() is called when KryoSerializerInstance is 
 initialized.

 Basically I'm thinking a quick fix for 1.2:

 1. Add a classLoader field to KryoSerializer; initialize new
 KryoSerializerInstance with that class loader

  2. Set that classLoader to the executor's class loader when Executor
 is initialized.

 Then all deser calls should be using the executor's class loader.




 On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis 
 graham.den...@gmail.com wrote:

 Hi Reynold,

 That would solve this specific issue, but you'd need to be careful
 that you never created a serialiser instance before the first task is
 received.  Currently in Executor.TaskRunner.run a closure serialiser
 instance is created before any application jars are downloaded, but that
 could be moved.  To me, this seems a little fragile.

 However there is a related issue where you can't ship a custom
 serialiser in an application jar because the serialiser is instantiated
 when the SparkEnv object is created, which is before any tasks are 
 received
 by the executor.  The above approach wouldn't help with this problem.
  Additionally, the YARN scheduler currently uses this approach of adding
 the application jar to the Executor classpath, so it would make things a
 bit more uniform.

 Cheers,
 Graham


 On 14 August 2014 17:37, Reynold Xin r...@databricks.com wrote:

 Graham,

 Thanks for working on this. This is an important bug to fix.

  I don't have the whole context and obviously I haven't spent
 nearly as much time on this as you have, but I'm wondering what if we
 always pass the executor's ClassLoader to the Kryo serializer? Will that
 solve this problem?




 On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis 
 graham.den...@gmail.com wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the
 default).  Theoretically Spark supports custom serialisers, but due to 
 a
 related issue, custom serialisers currently can't live in application 
 jars
 and must be available to all executors at launch.  My PR fixes this 
 issue
 as well, allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com
 wrote:

 Sorry I just saw Graham's email after sending my previous email
 about this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 
 1.0...

 What's the status of this PR ? Will this fix be back-ported to
 1.0.1 as we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are
 the other recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis 
 graham.den...@gmail.com wrote:

 I now have a complete pull request for this issue that I'd like
 to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a
 testcase for the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes
 exceptions raised
 while attempting to run the custom kryo registrator not to be
 swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com
 wrote:

  I've submitted a work-in-progress pull request for this issue
 that I'd
  like feedback on.  See
 https://github.com/apache/spark/pull/1890 . I've
  

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
Hi Deb,

The only alternative serialiser is the JavaSerialiser (the default).
 Theoretically Spark supports custom serialisers, but due to a related
issue, custom serialisers currently can't live in application jars and must
be available to all executors at launch.  My PR fixes this issue as well,
allowing custom serialisers to be shipped in application jars.

Graham


On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com wrote:

 Sorry I just saw Graham's email after sending my previous email about this
 bug...

 I have been seeing this same issue on our ALS runs last week but I thought
 it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

 What's the status of this PR ? Will this fix be back-ported to 1.0.1 as we
 are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are the other
 recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 I now have a complete pull request for this issue that I'd like to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a testcase for the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes exceptions raised
 while attempting to run the custom kryo registrator not to be swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com wrote:

  I've submitted a work-in-progress pull request for this issue that I'd
  like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
  also submitted a pull request for the related issue that the exceptions
 hit
  when trying to use a custom kryo registrator are being swallowed:
  https://github.com/apache/spark/pull/1827
 
  The approach in my pull request is to get the Worker processes to
 download
  the application jars and add them to the Executor class path at launch
  time. There are a couple of things that still need to be done before
 this
  can be merged:
  1. At the moment, the first time a task runs in the executor, the
  application jars are downloaded again.  My solution here would be to
 make
  the executor not download any jars that already exist.  Previously, the
  driver  executor kept track of the timestamp of jar files and would
  redownload 'updated' jars, however this never made sense as the previous
  version of the updated jar may have already been loaded into the
 executor,
  so the updated jar may have no effect.  As my current pull request
 removes
  the timestamp for jars, just checking whether the jar exists will allow
 us
  to avoid downloading the jars again.
  2. Tests. :-)
 
  A side-benefit of my pull request is that you will be able to use custom
  serialisers that are distributed in a user jar.  Currently, the
 serialiser
  instance is created in the Executor process before the first task is
  received and therefore before any user jars are downloaded.  As this PR
  adds user jars to the Executor process at launch time, this won't be an
  issue.
 
 
  On 7 August 2014 12:01, Graham Dennis graham.den...@gmail.com wrote:
 
  See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
  the full stacktrace, but it's in the BlockManager/BlockManagerWorker
 where
  it's trying to fulfil a getBlock request for another node.  The
 objects
  that would be in the block haven't yet been serialised, and that then
  causes the deserialisation to happen on that thread.  See
  MemoryStore.scala:102.
 
 
  On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:
 
  I don't think it was a conscious design decision to not include the
  application classes in the connection manager serializer. We should
 fix
  that. Where is it deserializing data in that thread?
 
   4 might make sense in the long run, but it adds a lot of complexity
 to
  the code base (whole separate code base, task queue,
 blocking/non-blocking
  logic within task threads) that can be error prone, so I think it is
 best
  to stay away from that right now.
 
 
 
 
 
  On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis 
 graham.den...@gmail.com
  wrote:
 
  Hi Spark devs,
 
  I’ve posted an issue on JIRA (
  https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
  using
  Kryo serialisation with a custom Kryo registrator to register custom
  classes with Kryo.  This is an insidious issue that
  non-deterministically
  causes Kryo to have different ID number = class name maps on
 different
  nodes, which then causes weird exceptions (ClassCastException,
  ClassNotFoundException, ArrayIndexOutOfBoundsException) at
  deserialisation
  time.  I’ve created a reliable reproduction for the issue here:
  https://github.com/GrahamDennis/spark-kryo-serialisation
 
  I’m happy to try and put a pull request together to try and address
  this,
  but it’s not obvious to me the right way to solve this and I’d like
 to
  get
 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Reynold Xin
Graham,

Thanks for working on this. This is an important bug to fix.

I don't have the whole context and obviously I haven't spent nearly as much
time on this as you have, but I'm wondering what if we always pass the
executor's ClassLoader to the Kryo serializer? Will that solve this problem?




On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis graham.den...@gmail.com
wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the default).
  Theoretically Spark supports custom serialisers, but due to a related
 issue, custom serialisers currently can't live in application jars and must
 be available to all executors at launch.  My PR fixes this issue as well,
 allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com wrote:

 Sorry I just saw Graham's email after sending my previous email about
 this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

 What's the status of this PR ? Will this fix be back-ported to 1.0.1 as
 we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are the
 other recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 I now have a complete pull request for this issue that I'd like to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a testcase for
 the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes exceptions raised
 while attempting to run the custom kryo registrator not to be swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com wrote:

  I've submitted a work-in-progress pull request for this issue that I'd
  like feedback on.  See https://github.com/apache/spark/pull/1890 .
 I've
  also submitted a pull request for the related issue that the
 exceptions hit
  when trying to use a custom kryo registrator are being swallowed:
  https://github.com/apache/spark/pull/1827
 
  The approach in my pull request is to get the Worker processes to
 download
  the application jars and add them to the Executor class path at launch
  time. There are a couple of things that still need to be done before
 this
  can be merged:
  1. At the moment, the first time a task runs in the executor, the
  application jars are downloaded again.  My solution here would be to
 make
  the executor not download any jars that already exist.  Previously, the
  driver  executor kept track of the timestamp of jar files and would
  redownload 'updated' jars, however this never made sense as the
 previous
  version of the updated jar may have already been loaded into the
 executor,
  so the updated jar may have no effect.  As my current pull request
 removes
  the timestamp for jars, just checking whether the jar exists will
 allow us
  to avoid downloading the jars again.
  2. Tests. :-)
 
  A side-benefit of my pull request is that you will be able to use
 custom
  serialisers that are distributed in a user jar.  Currently, the
 serialiser
  instance is created in the Executor process before the first task is
  received and therefore before any user jars are downloaded.  As this PR
  adds user jars to the Executor process at launch time, this won't be an
  issue.
 
 
  On 7 August 2014 12:01, Graham Dennis graham.den...@gmail.com wrote:
 
  See my comment on https://issues.apache.org/jira/browse/SPARK-2878
 for
  the full stacktrace, but it's in the BlockManager/BlockManagerWorker
 where
  it's trying to fulfil a getBlock request for another node.  The
 objects
  that would be in the block haven't yet been serialised, and that then
  causes the deserialisation to happen on that thread.  See
  MemoryStore.scala:102.
 
 
  On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:
 
  I don't think it was a conscious design decision to not include the
  application classes in the connection manager serializer. We should
 fix
  that. Where is it deserializing data in that thread?
 
   4 might make sense in the long run, but it adds a lot of complexity
 to
  the code base (whole separate code base, task queue,
 blocking/non-blocking
  logic within task threads) that can be error prone, so I think it is
 best
  to stay away from that right now.
 
 
 
 
 
  On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis 
 graham.den...@gmail.com
  wrote:
 
  Hi Spark devs,
 
  I’ve posted an issue on JIRA (
  https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
  using
  Kryo serialisation with a custom Kryo registrator to register custom
  classes with Kryo.  This is an insidious issue that
  non-deterministically
  causes Kryo to have different ID number = class name maps on
 different
  nodes, which then causes weird 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
Hi Reynold,

That would solve this specific issue, but you'd need to be careful that you
never created a serialiser instance before the first task is received.
 Currently in Executor.TaskRunner.run a closure serialiser instance is
created before any application jars are downloaded, but that could be
moved.  To me, this seems a little fragile.

However there is a related issue where you can't ship a custom serialiser
in an application jar because the serialiser is instantiated when the
SparkEnv object is created, which is before any tasks are received by the
executor.  The above approach wouldn't help with this problem.
 Additionally, the YARN scheduler currently uses this approach of adding
the application jar to the Executor classpath, so it would make things a
bit more uniform.

Cheers,
Graham


On 14 August 2014 17:37, Reynold Xin r...@databricks.com wrote:

 Graham,

 Thanks for working on this. This is an important bug to fix.

 I don't have the whole context and obviously I haven't spent nearly as
 much time on this as you have, but I'm wondering what if we always pass the
 executor's ClassLoader to the Kryo serializer? Will that solve this problem?




 On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the default).
  Theoretically Spark supports custom serialisers, but due to a related
 issue, custom serialisers currently can't live in application jars and must
 be available to all executors at launch.  My PR fixes this issue as well,
 allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com wrote:

 Sorry I just saw Graham's email after sending my previous email about
 this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

 What's the status of this PR ? Will this fix be back-ported to 1.0.1 as
 we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are the
 other recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 I now have a complete pull request for this issue that I'd like to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a testcase for
 the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes exceptions
 raised
 while attempting to run the custom kryo registrator not to be swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com wrote:

  I've submitted a work-in-progress pull request for this issue that I'd
  like feedback on.  See https://github.com/apache/spark/pull/1890 .
 I've
  also submitted a pull request for the related issue that the
 exceptions hit
  when trying to use a custom kryo registrator are being swallowed:
  https://github.com/apache/spark/pull/1827
 
  The approach in my pull request is to get the Worker processes to
 download
  the application jars and add them to the Executor class path at launch
  time. There are a couple of things that still need to be done before
 this
  can be merged:
  1. At the moment, the first time a task runs in the executor, the
  application jars are downloaded again.  My solution here would be to
 make
  the executor not download any jars that already exist.  Previously,
 the
  driver  executor kept track of the timestamp of jar files and would
  redownload 'updated' jars, however this never made sense as the
 previous
  version of the updated jar may have already been loaded into the
 executor,
  so the updated jar may have no effect.  As my current pull request
 removes
  the timestamp for jars, just checking whether the jar exists will
 allow us
  to avoid downloading the jars again.
  2. Tests. :-)
 
  A side-benefit of my pull request is that you will be able to use
 custom
  serialisers that are distributed in a user jar.  Currently, the
 serialiser
  instance is created in the Executor process before the first task is
  received and therefore before any user jars are downloaded.  As this
 PR
  adds user jars to the Executor process at launch time, this won't be
 an
  issue.
 
 
  On 7 August 2014 12:01, Graham Dennis graham.den...@gmail.com
 wrote:
 
  See my comment on https://issues.apache.org/jira/browse/SPARK-2878
 for
  the full stacktrace, but it's in the BlockManager/BlockManagerWorker
 where
  it's trying to fulfil a getBlock request for another node.  The
 objects
  that would be in the block haven't yet been serialised, and that then
  causes the deserialisation to happen on that thread.  See
  MemoryStore.scala:102.
 
 
  On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:
 
  I don't think it was a conscious design decision to not include 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
In part, my assertion was based on a comment by sryza on my PR (
https://github.com/apache/spark/pull/1890#issuecomment-51805750), however I
thought I had also seen it in the YARN code base.  However, now that I look
for it, I can't find where this happens, so perhaps I was imagining the
YARN behaviour.


On 14 August 2014 17:57, Debasish Das debasish.da...@gmail.com wrote:

 By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on
 YARN as well...

 So it is a common problem in both standalone and YARN mode deployment...


 On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Reynold,

 That would solve this specific issue, but you'd need to be careful that
 you never created a serialiser instance before the first task is received.
  Currently in Executor.TaskRunner.run a closure serialiser instance is
 created before any application jars are downloaded, but that could be
 moved.  To me, this seems a little fragile.

 However there is a related issue where you can't ship a custom serialiser
 in an application jar because the serialiser is instantiated when the
 SparkEnv object is created, which is before any tasks are received by the
 executor.  The above approach wouldn't help with this problem.
  Additionally, the YARN scheduler currently uses this approach of adding
 the application jar to the Executor classpath, so it would make things a
 bit more uniform.

 Cheers,
 Graham


 On 14 August 2014 17:37, Reynold Xin r...@databricks.com wrote:

 Graham,

 Thanks for working on this. This is an important bug to fix.

  I don't have the whole context and obviously I haven't spent nearly as
 much time on this as you have, but I'm wondering what if we always pass the
 executor's ClassLoader to the Kryo serializer? Will that solve this problem?




 On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis graham.den...@gmail.com
  wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the default).
  Theoretically Spark supports custom serialisers, but due to a related
 issue, custom serialisers currently can't live in application jars and must
 be available to all executors at launch.  My PR fixes this issue as well,
 allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com wrote:

 Sorry I just saw Graham's email after sending my previous email about
 this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

 What's the status of this PR ? Will this fix be back-ported to 1.0.1
 as we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are the
 other recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis 
 graham.den...@gmail.com wrote:

 I now have a complete pull request for this issue that I'd like to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a testcase
 for the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes exceptions
 raised
 while attempting to run the custom kryo registrator not to be
 swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com
 wrote:

  I've submitted a work-in-progress pull request for this issue that
 I'd
  like feedback on.  See https://github.com/apache/spark/pull/1890 .
 I've
  also submitted a pull request for the related issue that the
 exceptions hit
  when trying to use a custom kryo registrator are being swallowed:
  https://github.com/apache/spark/pull/1827
 
  The approach in my pull request is to get the Worker processes to
 download
  the application jars and add them to the Executor class path at
 launch
  time. There are a couple of things that still need to be done
 before this
  can be merged:
  1. At the moment, the first time a task runs in the executor, the
  application jars are downloaded again.  My solution here would be
 to make
  the executor not download any jars that already exist.  Previously,
 the
  driver  executor kept track of the timestamp of jar files and would
  redownload 'updated' jars, however this never made sense as the
 previous
  version of the updated jar may have already been loaded into the
 executor,
  so the updated jar may have no effect.  As my current pull request
 removes
  the timestamp for jars, just checking whether the jar exists will
 allow us
  to avoid downloading the jars again.
  2. Tests. :-)
 
  A side-benefit of my pull request is that you will be able to use
 custom
  serialisers that are distributed in a user jar.  Currently, the
 serialiser
  instance is created in the Executor process before the first task is
  received and therefore before any user jars are downloaded.  As
 this PR
  adds user jars to the 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Reynold Xin
Graham,

SparkEnv only creates a KryoSerializer, but as I understand that serializer
doesn't actually initializes the registrator since that is only called when
newKryo() is called when KryoSerializerInstance is initialized.

Basically I'm thinking a quick fix for 1.2:

1. Add a classLoader field to KryoSerializer; initialize new
KryoSerializerInstance with that class loader

2. Set that classLoader to the executor's class loader when Executor is
initialized.

Then all deser calls should be using the executor's class loader.




On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis graham.den...@gmail.com
wrote:

 Hi Reynold,

 That would solve this specific issue, but you'd need to be careful that
 you never created a serialiser instance before the first task is received.
  Currently in Executor.TaskRunner.run a closure serialiser instance is
 created before any application jars are downloaded, but that could be
 moved.  To me, this seems a little fragile.

 However there is a related issue where you can't ship a custom serialiser
 in an application jar because the serialiser is instantiated when the
 SparkEnv object is created, which is before any tasks are received by the
 executor.  The above approach wouldn't help with this problem.
  Additionally, the YARN scheduler currently uses this approach of adding
 the application jar to the Executor classpath, so it would make things a
 bit more uniform.

 Cheers,
 Graham


 On 14 August 2014 17:37, Reynold Xin r...@databricks.com wrote:

 Graham,

 Thanks for working on this. This is an important bug to fix.

  I don't have the whole context and obviously I haven't spent nearly as
 much time on this as you have, but I'm wondering what if we always pass the
 executor's ClassLoader to the Kryo serializer? Will that solve this problem?




 On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Deb,

 The only alternative serialiser is the JavaSerialiser (the default).
  Theoretically Spark supports custom serialisers, but due to a related
 issue, custom serialisers currently can't live in application jars and must
 be available to all executors at launch.  My PR fixes this issue as well,
 allowing custom serialisers to be shipped in application jars.

 Graham


 On 14 August 2014 16:56, Debasish Das debasish.da...@gmail.com wrote:

 Sorry I just saw Graham's email after sending my previous email about
 this bug...

 I have been seeing this same issue on our ALS runs last week but I
 thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

 What's the status of this PR ? Will this fix be back-ported to 1.0.1 as
 we are running 1.0.1 stable standalone cluster ?

 Till the PR merges does it make sense to not use Kryo ? What are the
 other recommended efficient serializers ?

 Thanks.
 Deb


 On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis graham.den...@gmail.com
  wrote:

 I now have a complete pull request for this issue that I'd like to get
 reviewed and committed.  The PR is available here:
 https://github.com/apache/spark/pull/1890 and includes a testcase for
 the
 issue I described.  I've also submitted a related PR (
 https://github.com/apache/spark/pull/1827) that causes exceptions
 raised
 while attempting to run the custom kryo registrator not to be
 swallowed.

 Thanks,
 Graham


 On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com
 wrote:

  I've submitted a work-in-progress pull request for this issue that
 I'd
  like feedback on.  See https://github.com/apache/spark/pull/1890 .
 I've
  also submitted a pull request for the related issue that the
 exceptions hit
  when trying to use a custom kryo registrator are being swallowed:
  https://github.com/apache/spark/pull/1827
 
  The approach in my pull request is to get the Worker processes to
 download
  the application jars and add them to the Executor class path at
 launch
  time. There are a couple of things that still need to be done before
 this
  can be merged:
  1. At the moment, the first time a task runs in the executor, the
  application jars are downloaded again.  My solution here would be to
 make
  the executor not download any jars that already exist.  Previously,
 the
  driver  executor kept track of the timestamp of jar files and would
  redownload 'updated' jars, however this never made sense as the
 previous
  version of the updated jar may have already been loaded into the
 executor,
  so the updated jar may have no effect.  As my current pull request
 removes
  the timestamp for jars, just checking whether the jar exists will
 allow us
  to avoid downloading the jars again.
  2. Tests. :-)
 
  A side-benefit of my pull request is that you will be able to use
 custom
  serialisers that are distributed in a user jar.  Currently, the
 serialiser
  instance is created in the Executor process before the first task is
  received and therefore before any user jars are downloaded.  As this
 PR
  adds user jars to the Executor process at 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-13 Thread Graham Dennis
I now have a complete pull request for this issue that I'd like to get
reviewed and committed.  The PR is available here:
https://github.com/apache/spark/pull/1890 and includes a testcase for the
issue I described.  I've also submitted a related PR (
https://github.com/apache/spark/pull/1827) that causes exceptions raised
while attempting to run the custom kryo registrator not to be swallowed.

Thanks,
Graham


On 12 August 2014 18:44, Graham Dennis graham.den...@gmail.com wrote:

 I've submitted a work-in-progress pull request for this issue that I'd
 like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
 also submitted a pull request for the related issue that the exceptions hit
 when trying to use a custom kryo registrator are being swallowed:
 https://github.com/apache/spark/pull/1827

 The approach in my pull request is to get the Worker processes to download
 the application jars and add them to the Executor class path at launch
 time. There are a couple of things that still need to be done before this
 can be merged:
 1. At the moment, the first time a task runs in the executor, the
 application jars are downloaded again.  My solution here would be to make
 the executor not download any jars that already exist.  Previously, the
 driver  executor kept track of the timestamp of jar files and would
 redownload 'updated' jars, however this never made sense as the previous
 version of the updated jar may have already been loaded into the executor,
 so the updated jar may have no effect.  As my current pull request removes
 the timestamp for jars, just checking whether the jar exists will allow us
 to avoid downloading the jars again.
 2. Tests. :-)

 A side-benefit of my pull request is that you will be able to use custom
 serialisers that are distributed in a user jar.  Currently, the serialiser
 instance is created in the Executor process before the first task is
 received and therefore before any user jars are downloaded.  As this PR
 adds user jars to the Executor process at launch time, this won't be an
 issue.


 On 7 August 2014 12:01, Graham Dennis graham.den...@gmail.com wrote:

 See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
 the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
 it's trying to fulfil a getBlock request for another node.  The objects
 that would be in the block haven't yet been serialised, and that then
 causes the deserialisation to happen on that thread.  See
 MemoryStore.scala:102.


 On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:

 I don't think it was a conscious design decision to not include the
 application classes in the connection manager serializer. We should fix
 that. Where is it deserializing data in that thread?

  4 might make sense in the long run, but it adds a lot of complexity to
 the code base (whole separate code base, task queue, blocking/non-blocking
 logic within task threads) that can be error prone, so I think it is best
 to stay away from that right now.





 On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Spark devs,

 I’ve posted an issue on JIRA (
 https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
 using
 Kryo serialisation with a custom Kryo registrator to register custom
 classes with Kryo.  This is an insidious issue that
 non-deterministically
 causes Kryo to have different ID number = class name maps on different
 nodes, which then causes weird exceptions (ClassCastException,
 ClassNotFoundException, ArrayIndexOutOfBoundsException) at
 deserialisation
 time.  I’ve created a reliable reproduction for the issue here:
 https://github.com/GrahamDennis/spark-kryo-serialisation

 I’m happy to try and put a pull request together to try and address
 this,
 but it’s not obvious to me the right way to solve this and I’d like to
 get
 feedback / ideas on how to address this.

 The root cause of the problem is a Failed to run
 spark.kryo.registrator”
 error which non-deterministically occurs in some executor processes
 during
 operation.  My custom Kryo registrator is in the application jar, and
 it is
 accessible on the worker nodes.  This is demonstrated by the fact that
 most
 of the time the custom kryo registrator is successfully run.

 What’s happening is that Kryo serialisation/deserialisation is happening
 most of the time on an “Executor task launch worker” thread, which has
 the
 thread's class loader set to contain the application jar.  This happens
 in
 `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
 tell, it is only these threads that have access to the application jar
 (that contains the custom Kryo registrator).  However, the
 ConnectionManager threads sometimes need to serialise/deserialise
 objects
 to satisfy “getBlock” requests when the objects haven’t previously been
 serialised.  As the ConnectionManager threads don’t have the application
 jar available from their class 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-12 Thread Graham Dennis
I've submitted a work-in-progress pull request for this issue that I'd like
feedback on.  See https://github.com/apache/spark/pull/1890 . I've also
submitted a pull request for the related issue that the exceptions hit when
trying to use a custom kryo registrator are being swallowed:
https://github.com/apache/spark/pull/1827

The approach in my pull request is to get the Worker processes to download
the application jars and add them to the Executor class path at launch
time. There are a couple of things that still need to be done before this
can be merged:
1. At the moment, the first time a task runs in the executor, the
application jars are downloaded again.  My solution here would be to make
the executor not download any jars that already exist.  Previously, the
driver  executor kept track of the timestamp of jar files and would
redownload 'updated' jars, however this never made sense as the previous
version of the updated jar may have already been loaded into the executor,
so the updated jar may have no effect.  As my current pull request removes
the timestamp for jars, just checking whether the jar exists will allow us
to avoid downloading the jars again.
2. Tests. :-)

A side-benefit of my pull request is that you will be able to use custom
serialisers that are distributed in a user jar.  Currently, the serialiser
instance is created in the Executor process before the first task is
received and therefore before any user jars are downloaded.  As this PR
adds user jars to the Executor process at launch time, this won't be an
issue.


On 7 August 2014 12:01, Graham Dennis graham.den...@gmail.com wrote:

 See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
 the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
 it's trying to fulfil a getBlock request for another node.  The objects
 that would be in the block haven't yet been serialised, and that then
 causes the deserialisation to happen on that thread.  See
 MemoryStore.scala:102.


 On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:

 I don't think it was a conscious design decision to not include the
 application classes in the connection manager serializer. We should fix
 that. Where is it deserializing data in that thread?

 4 might make sense in the long run, but it adds a lot of complexity to
 the code base (whole separate code base, task queue, blocking/non-blocking
 logic within task threads) that can be error prone, so I think it is best
 to stay away from that right now.





 On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Spark devs,

 I’ve posted an issue on JIRA (
 https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
 using
 Kryo serialisation with a custom Kryo registrator to register custom
 classes with Kryo.  This is an insidious issue that non-deterministically
 causes Kryo to have different ID number = class name maps on different
 nodes, which then causes weird exceptions (ClassCastException,
 ClassNotFoundException, ArrayIndexOutOfBoundsException) at
 deserialisation
 time.  I’ve created a reliable reproduction for the issue here:
 https://github.com/GrahamDennis/spark-kryo-serialisation

 I’m happy to try and put a pull request together to try and address this,
 but it’s not obvious to me the right way to solve this and I’d like to
 get
 feedback / ideas on how to address this.

 The root cause of the problem is a Failed to run spark.kryo.registrator”
 error which non-deterministically occurs in some executor processes
 during
 operation.  My custom Kryo registrator is in the application jar, and it
 is
 accessible on the worker nodes.  This is demonstrated by the fact that
 most
 of the time the custom kryo registrator is successfully run.

 What’s happening is that Kryo serialisation/deserialisation is happening
 most of the time on an “Executor task launch worker” thread, which has
 the
 thread's class loader set to contain the application jar.  This happens
 in
 `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
 tell, it is only these threads that have access to the application jar
 (that contains the custom Kryo registrator).  However, the
 ConnectionManager threads sometimes need to serialise/deserialise objects
 to satisfy “getBlock” requests when the objects haven’t previously been
 serialised.  As the ConnectionManager threads don’t have the application
 jar available from their class loader, when it tries to look up the
 custom
 Kryo registrator, this fails.  Spark then swallows this exception, which
 results in a different ID number — class mapping for this kryo instance,
 and this then causes deserialisation errors later on a different node.

 A related issue to the issue reported in SPARK-2878 is that Spark
 probably
 shouldn’t swallow the ClassNotFound exception for custom Kryo
 registrators.
  The user has explicitly specified this class, and if it
 deterministically
 can’t be found, then it 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-06 Thread Reynold Xin
I don't think it was a conscious design decision to not include the
application classes in the connection manager serializer. We should fix
that. Where is it deserializing data in that thread?

4 might make sense in the long run, but it adds a lot of complexity to the
code base (whole separate code base, task queue, blocking/non-blocking
logic within task threads) that can be error prone, so I think it is best
to stay away from that right now.





On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis graham.den...@gmail.com
wrote:

 Hi Spark devs,

 I’ve posted an issue on JIRA (
 https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
 Kryo serialisation with a custom Kryo registrator to register custom
 classes with Kryo.  This is an insidious issue that non-deterministically
 causes Kryo to have different ID number = class name maps on different
 nodes, which then causes weird exceptions (ClassCastException,
 ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
 time.  I’ve created a reliable reproduction for the issue here:
 https://github.com/GrahamDennis/spark-kryo-serialisation

 I’m happy to try and put a pull request together to try and address this,
 but it’s not obvious to me the right way to solve this and I’d like to get
 feedback / ideas on how to address this.

 The root cause of the problem is a Failed to run spark.kryo.registrator”
 error which non-deterministically occurs in some executor processes during
 operation.  My custom Kryo registrator is in the application jar, and it is
 accessible on the worker nodes.  This is demonstrated by the fact that most
 of the time the custom kryo registrator is successfully run.

 What’s happening is that Kryo serialisation/deserialisation is happening
 most of the time on an “Executor task launch worker” thread, which has the
 thread's class loader set to contain the application jar.  This happens in
 `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
 tell, it is only these threads that have access to the application jar
 (that contains the custom Kryo registrator).  However, the
 ConnectionManager threads sometimes need to serialise/deserialise objects
 to satisfy “getBlock” requests when the objects haven’t previously been
 serialised.  As the ConnectionManager threads don’t have the application
 jar available from their class loader, when it tries to look up the custom
 Kryo registrator, this fails.  Spark then swallows this exception, which
 results in a different ID number — class mapping for this kryo instance,
 and this then causes deserialisation errors later on a different node.

 A related issue to the issue reported in SPARK-2878 is that Spark probably
 shouldn’t swallow the ClassNotFound exception for custom Kryo registrators.
  The user has explicitly specified this class, and if it deterministically
 can’t be found, then it may cause problems at serialisation /
 deserialisation time.  If only sometimes it can’t be found (as in this
 case), then it leads to a data corruption issue later on.  Either way,
 we’re better off dying due to the ClassNotFound exception earlier, than the
 weirder errors later on.

 I have some ideas on potential solutions to this issue, but I’m keen for
 experienced eyes to critique these approaches:

 1. The simplest approach to fixing this would be to just make the
 application jar available to the connection manager threads, but I’m
 guessing it’s a design decision to isolate the application jar to just the
 executor task runner threads.  Also, I don’t know if there are any other
 threads that might be interacting with kryo serialisation /
 deserialisation.
 2. Before looking up the custom Kryo registrator, change the thread’s class
 loader to include the application jar, then restore the class loader after
 the kryo registrator has been run.  I don’t know if this would have any
 other side-effects.
 3. Always serialise / deserialise on the existing TaskRunner threads,
 rather than delaying serialisation until later, when it can be done only if
 needed.  This approach would probably have negative performance
 consequences.
 4. Create a new dedicated thread pool for lazy serialisation /
 deserialisation that has the application jar on the class path.
  Serialisation / deserialisation would be the only thing these threads do,
 and this would minimise conflicts / interactions between the application
 jar and other jars.

 #4 sounds like the best approach to me, but I think would require
 considerable knowledge of Spark internals, which is beyond me at present.
  Does anyone have any better (and ideally simpler) ideas?

 Cheers,

 Graham



Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-06 Thread Graham Dennis
See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for the
full stacktrace, but it's in the BlockManager/BlockManagerWorker where it's
trying to fulfil a getBlock request for another node.  The objects that
would be in the block haven't yet been serialised, and that then causes the
deserialisation to happen on that thread.  See MemoryStore.scala:102.


On 7 August 2014 11:53, Reynold Xin r...@databricks.com wrote:

 I don't think it was a conscious design decision to not include the
 application classes in the connection manager serializer. We should fix
 that. Where is it deserializing data in that thread?

 4 might make sense in the long run, but it adds a lot of complexity to the
 code base (whole separate code base, task queue, blocking/non-blocking
 logic within task threads) that can be error prone, so I think it is best
 to stay away from that right now.





 On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis graham.den...@gmail.com
 wrote:

 Hi Spark devs,

 I’ve posted an issue on JIRA (
 https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
 Kryo serialisation with a custom Kryo registrator to register custom
 classes with Kryo.  This is an insidious issue that non-deterministically
 causes Kryo to have different ID number = class name maps on different
 nodes, which then causes weird exceptions (ClassCastException,
 ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
 time.  I’ve created a reliable reproduction for the issue here:
 https://github.com/GrahamDennis/spark-kryo-serialisation

 I’m happy to try and put a pull request together to try and address this,
 but it’s not obvious to me the right way to solve this and I’d like to get
 feedback / ideas on how to address this.

 The root cause of the problem is a Failed to run spark.kryo.registrator”
 error which non-deterministically occurs in some executor processes during
 operation.  My custom Kryo registrator is in the application jar, and it
 is
 accessible on the worker nodes.  This is demonstrated by the fact that
 most
 of the time the custom kryo registrator is successfully run.

 What’s happening is that Kryo serialisation/deserialisation is happening
 most of the time on an “Executor task launch worker” thread, which has the
 thread's class loader set to contain the application jar.  This happens in
 `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
 tell, it is only these threads that have access to the application jar
 (that contains the custom Kryo registrator).  However, the
 ConnectionManager threads sometimes need to serialise/deserialise objects
 to satisfy “getBlock” requests when the objects haven’t previously been
 serialised.  As the ConnectionManager threads don’t have the application
 jar available from their class loader, when it tries to look up the custom
 Kryo registrator, this fails.  Spark then swallows this exception, which
 results in a different ID number — class mapping for this kryo instance,
 and this then causes deserialisation errors later on a different node.

 A related issue to the issue reported in SPARK-2878 is that Spark probably
 shouldn’t swallow the ClassNotFound exception for custom Kryo
 registrators.
  The user has explicitly specified this class, and if it deterministically
 can’t be found, then it may cause problems at serialisation /
 deserialisation time.  If only sometimes it can’t be found (as in this
 case), then it leads to a data corruption issue later on.  Either way,
 we’re better off dying due to the ClassNotFound exception earlier, than
 the
 weirder errors later on.

 I have some ideas on potential solutions to this issue, but I’m keen for
 experienced eyes to critique these approaches:

 1. The simplest approach to fixing this would be to just make the
 application jar available to the connection manager threads, but I’m
 guessing it’s a design decision to isolate the application jar to just the
 executor task runner threads.  Also, I don’t know if there are any other
 threads that might be interacting with kryo serialisation /
 deserialisation.
 2. Before looking up the custom Kryo registrator, change the thread’s
 class
 loader to include the application jar, then restore the class loader after
 the kryo registrator has been run.  I don’t know if this would have any
 other side-effects.
 3. Always serialise / deserialise on the existing TaskRunner threads,
 rather than delaying serialisation until later, when it can be done only
 if
 needed.  This approach would probably have negative performance
 consequences.
 4. Create a new dedicated thread pool for lazy serialisation /
 deserialisation that has the application jar on the class path.
  Serialisation / deserialisation would be the only thing these threads do,
 and this would minimise conflicts / interactions between the application
 jar and other jars.

 #4 sounds like the best approach to me, but I think would require
 considerable knowledge of