Well, reading your logs, here is what happens :
You do a combineByKey (so you have a join probably somewhere), which
spills on disk because it's too big. To spill on disk it serializes, and
the blocks are > 2GB.
From a 2GB dataset, it's easy to exand to several TB
Increase parallelism, make sure that your combineByKey has enough
different keys, and see what happens.
Guillaume
Thank you, Guillaume, my dataset is not that large, it's totally ~2GB
2014-10-20 16:58 GMT+08:00 Guillaume Pitel <guillaume.pi...@exensa.com
<mailto:guillaume.pi...@exensa.com>>:
Hi,
It happened to me with blocks which take more than 1 or 2 GB once
serialized
I think the problem was that during serialization, a Byte Array is
created, and arrays in java are indexed by ints. When the
serializer needs to increase the buffer size, it does so somehow,
but then writing in the array leads to an error.
Don't know if your problem is the same, but maybe.
In general Java or Java libraries do not check for oversized
arrays, which is really bad when you play with big data.
Guillaume
The exception drives me crazy, because it occurs randomly.
I didn't know which line of my code causes this exception.
I didn't even understand what "KryoException:
java.lang.NegativeArraySizeException" means, or even implies?
14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2
in stage 0.0 (TID 181, gs-server-1000):
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
value (org.apache.spark.sql.catalyst.expressions.MutableAny)
values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
otherElements (org.apache.spark.util.collection.CompactBuffer)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705