We are trying to use kryo serialization, but with kryo serialization ON the memory consumption does not change. We have tried this on multiple sets of data. We have also checked the logs of Kryo serialization and have confirmed that Kryo is being used.
Can somebody please help us with this? The script used is given below. SCRIPT /import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.mutable.Buffer import scala.Array import scala.math.Ordering.Implicits._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.RangePartitioner import org.apache.spark.HashPartitioner //For Kryo logging import com.esotericsoftware.minlog.Log import com.esotericsoftware.minlog.Log._ Log.set(LEVEL_TRACE); val query = "select array(level_1, level_2, level_3, level_4, level_5, level_6, level_7, level_8, level_9, level_10, level_11, level_12, level_13, level_14, level_15, level_16, level_17, level_18, level_19, level_20, level_21, level_22, level_23, level_24, level_25) as unitids, class, cuts, type, data from table1 p join table2 b on (p.UnitId = b.unit_id) where runid = 912 and b.snapshotid = 220 and p.UnitId = b.unit_id" val rows: RDD[((Buffer[Any], String, Buffer[Any]), (String, scala.collection.mutable.Buffer[Any]))] = sc.sql2rdd(query).map(row => ((row.getList("unitids").asInstanceOf[java.util.List[Any]].asScala, row.getString("class"), row.getList("cuts").asInstanceOf[java.util.List[Any]].asScala), (row.getString("type"), row.getList("data").asInstanceOf[java.util.List[Any]].asScala))) var rows2Array: RDD[((Buffer[Any], String, Buffer[Any]), (String, Array[Float]))] = rows.map(row => (row._1, (row._2._1, ((row._2._2.map(y => y match { case floatWritable: org.apache.hadoop.io.FloatWritable => floatWritable.get case lazyFloat: org.apache.hadoop.hive.serde2.`lazy`.LazyFloat => lazyFloat.getWritableObject().get case _ => println("unknown data type " + y + " : "); 0 }))).toArray))) var allArrays: RDD[((Array[Long], String, Buffer[Any]), (String, Array[Float]))] = rows2Array.map(row => ((row._1._1.map(x => x match { case longWritable: org.apache.hadoop.io.LongWritable => longWritable.get case lazyLong: org.apache.hadoop.hive.serde2.`lazy`.LazyLong => lazyLong.getWritableObject().get case _ => println("unknown data type " + x + " : "); 0 }).toArray, row._1._2, row._1._3), row._2)) var dataRdd: RDD[((Array[Long], String, Array[String]), (String, Array[Float]))] = allArrays.map(row => ((row._1._1, row._1._2, row._1._3.map(x => x match { case str: String => str case _ => println("unknown data type " + x + " : "); new String("")}).toArray), row._2)) dataRdd = dataRdd.partitionBy(new HashPartitioner(64)).persist(StorageLevel.MEMORY_ONLY_SER) dataRdd.count() / -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.