Re: NegativeArraySizeException when doing joins on skewed data
I ran into this problem yesterday, but outside of the context of Spark. It's a limitation of Kryo's IdentityObjectIntMap. In Spark you might try using Java's internal serializer instead. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-when-doing-joins-on-skewed-data-tp21802p22986.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NegativeArraySizeException when doing joins on skewed data
Hi Tristan, Did upgrading to Kryo3 help? Thanks, Soila On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers wrote: > Yeah I implemented the same solution. It seems to kick in around the 4B > mark, but looking at the log I suspect it’s probably a function of the > number of unique objects more than anything. I definitely don’t have more > than 2B unique objects. > > > Will try the same test on Kryo3 and see if it goes away. > > T > > > On 27 February 2015 at 06:21, Soila Pertet Kavulya > wrote: >> >> Thanks Tristan, >> >> I ran into a similar issue with broadcast variables. I worked around >> it by estimating the size of the object I want to broadcast, splitting >> it up into chunks that were less than 2G, then doing multiple >> broadcasts. This approach worked pretty well for broadcast variables >> less than 10GB on our system. However, for larger variables the spills >> to disk made progress painfully slow so we need to do regular joins. >> >> Do you know if there are any efforts to get Kryo to support objects >> larger than a couple of GBs. >> >> Soila >> >> On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers >> wrote: >> > I get the same exception simply by doing a large broadcast of about 6GB. >> > Note that I’m broadcasting a small number (~3m) of fat objects. There’s >> > plenty of free RAM. This and related kryo exceptions seem to crop-up >> > whenever an object graph of more than a couple of GB gets passed around. >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) >> > >> > at >> > >> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> > >> > at >> > >> > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> > >> > at >> > >> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) >> > >> > at >> > >> > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> > >> > at >> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) >> > >> > at >> > >> > org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) >> > >> > >> > Caused by: java.lang.NegativeArraySizeException >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) >> > >> > at >> > >> > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) >> > >> > at >> > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)
Re: NegativeArraySizeException when doing joins on skewed data
Hi Imran, I can confirm this still happens when calling Kryo serialisation directly, not I’m using Java. The output file is at about 440mb at the time of the crash. Kryo is version 2.21. When I get a chance I’ll see if I can make a shareable test case and try on Kryo 3.0, I doubt they’d be interested in a bug report on 2.21? Cheers Tristan On 27 February 2015 at 07:20, Imran Rashid wrote: > Hi Tristan, > > at first I thought you were just hitting another instance of > https://issues.apache.org/jira/browse/SPARK-1391, but I actually think > its entirely related to kryo. Would it be possible for you to try > serializing your object using kryo, without involving spark at all? If you > are unfamiliar w/ kryo, you could just try something like this, it would > also be OK to try out the utils in spark to do it, something like: > > val outputStream = new > FileOutputStream("/some/local/path/doesn't/really/matter/just/delete/me/afterwards") > > val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf) > val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream) > > kryoStreamSer.writeObject(yourBigObject).close() > > My guess is that this will fail. There is a little of spark's wrapping > code involved here too, but I suspect the error is out of our control. > From the error, it seems like whatever object you are trying to serialize > has more than 2B references: > Caused by: java.lang.NegativeArraySizeException > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap. > resize(IdentityObjectIntMap.java:409) > > Though that is rather surprising -- it doesn't even seem possible to me > with an object that is only 6 GB. > > There are a handful of other size restrictions and tuning parameters that > come with kryo as well. It would be good for us to write up some docs on > those limitations, as well as work with the kryo devs to see which ones can > be removed. (Eg., another one that I just noticed from browsing the code > is that even when writing to a stream, kryo has an internal buffer of > limited size, which is periodically flushes. Perhaps we can get kryo to > turn off that buffer, or we can at least get it to flush more often.) > > thanks, > Imran > > > On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers > wrote: > >> I get the same exception simply by doing a large broadcast of about 6GB. >> Note that I’m broadcasting a small number (~3m) of fat objects. There’s >> plenty of free RAM. This and related kryo exceptions seem to crop-up >> whenever an object graph of more than a couple of GB gets passed around. >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >> >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) >> >> at >> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) >> >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> >> at >> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) >> >> at >> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) >> >> at >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) >> >> at >> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84) >> >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) >> >> at >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) >> >> at >> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) >> >> >> Caused by: java.lang.NegativeArraySizeException >> >> at >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) >> >> at >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap
Re: NegativeArraySizeException when doing joins on skewed data
Hi Tristan, at first I thought you were just hitting another instance of https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its entirely related to kryo. Would it be possible for you to try serializing your object using kryo, without involving spark at all? If you are unfamiliar w/ kryo, you could just try something like this, it would also be OK to try out the utils in spark to do it, something like: val outputStream = new FileOutputStream("/some/local/path/doesn't/really/matter/just/delete/me/afterwards") val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf) val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream) kryoStreamSer.writeObject(yourBigObject).close() My guess is that this will fail. There is a little of spark's wrapping code involved here too, but I suspect the error is out of our control. >From the error, it seems like whatever object you are trying to serialize has more than 2B references: Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap. resize(IdentityObjectIntMap.java:409) Though that is rather surprising -- it doesn't even seem possible to me with an object that is only 6 GB. There are a handful of other size restrictions and tuning parameters that come with kryo as well. It would be good for us to write up some docs on those limitations, as well as work with the kryo devs to see which ones can be removed. (Eg., another one that I just noticed from browsing the code is that even when writing to a stream, kryo has an internal buffer of limited size, which is periodically flushes. Perhaps we can get kryo to turn off that buffer, or we can at least get it to flush more often.) thanks, Imran On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers wrote: > I get the same exception simply by doing a large broadcast of about 6GB. > Note that I’m broadcasting a small number (~3m) of fat objects. There’s > plenty of free RAM. This and related kryo exceptions seem to crop-up > whenever an object graph of more than a couple of GB gets passed around. > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) > > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) > > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) > > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) > > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) > > at > org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84) > > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) > > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) > > at > org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) > > > Caused by: java.lang.NegativeArraySizeException > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) > > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) > >
Re: NegativeArraySizeException when doing joins on skewed data
I get the same exception simply by doing a large broadcast of about 6GB. Note that I’m broadcasting a small number (~3m) of fat objects. There’s plenty of free RAM. This and related kryo exceptions seem to crop-up whenever an object graph of more than a couple of GB gets passed around. at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) ... 23 more On 26 February 2015 at 03:49, soila wrote: > I have been running into NegativeArraySizeException's when doing joins on > data with very skewed key distributions in Spark 1.2.0. I found a previous > post that mentioned that this exception arises when the size of the blocks > spilled during the shuffle exceeds 2GB. The post recommended increasing the > number of partitions. I tried increasing the number of partitions, and > using > the RangePartitioner instead of the HashPartitioner but still encountered > the problem. > > Does Spark support skewed joins similar to Pig? > > > com.esotericsoftware.kryo.KryoException: > java.lang.NegativeArraySizeException > Serialization trace: > otherElements (org.apache.spark.util.collection.CompactBuffer) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) > at > > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > > com.esotericsoftware.kryo.serializers.FieldSer