We are trying to use kryo serialization, but with kryo serialization ON the
memory consumption does not change. We have tried this on multiple sets of
data.
We have also checked the logs of Kryo serialization and have confirmed that
Kryo is being used.

Can somebody please help us with this?

The script used is given below. 
SCRIPT
/import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable.Buffer
import scala.Array
import scala.math.Ordering.Implicits._ 

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.RangePartitioner
import org.apache.spark.HashPartitioner

//For Kryo logging
import com.esotericsoftware.minlog.Log
import com.esotericsoftware.minlog.Log._
Log.set(LEVEL_TRACE);

val query = "select array(level_1, level_2,  level_3, level_4, level_5,
level_6, level_7, level_8, level_9, 

level_10, level_11, level_12, level_13, level_14, level_15, level_16,
level_17, level_18, level_19, level_20, 

level_21, level_22, level_23, level_24, level_25) as unitids, class, cuts,
type, data from table1 p join table2 b on 

(p.UnitId = b.unit_id) where runid = 912 and b.snapshotid = 220 and p.UnitId
= b.unit_id"

val rows: RDD[((Buffer[Any], String, Buffer[Any]), (String,
scala.collection.mutable.Buffer[Any]))] = 

sc.sql2rdd(query).map(row =>
((row.getList("unitids").asInstanceOf[java.util.List[Any]].asScala, 

row.getString("class"),
row.getList("cuts").asInstanceOf[java.util.List[Any]].asScala),
(row.getString("type"), 

row.getList("data").asInstanceOf[java.util.List[Any]].asScala)))

var rows2Array: RDD[((Buffer[Any], String, Buffer[Any]), (String,
Array[Float]))] = rows.map(row => (row._1, 

(row._2._1, ((row._2._2.map(y => y match {
          case floatWritable: org.apache.hadoop.io.FloatWritable =>
floatWritable.get
          case lazyFloat: org.apache.hadoop.hive.serde2.`lazy`.LazyFloat =>
lazyFloat.getWritableObject().get
          case _ => println("unknown data type " + y + " : "); 0
        }))).toArray)))

var allArrays: RDD[((Array[Long], String, Buffer[Any]), (String,
Array[Float]))] = rows2Array.map(row => 

((row._1._1.map(x => x match {    case longWritable:
org.apache.hadoop.io.LongWritable => longWritable.get     

case lazyLong: org.apache.hadoop.hive.serde2.`lazy`.LazyLong =>
lazyLong.getWritableObject().get          case _ => 

println("unknown data type " + x + " : "); 0    }).toArray, row._1._2,
row._1._3), row._2))

var dataRdd: RDD[((Array[Long], String, Array[String]), (String,
Array[Float]))] = allArrays.map(row => ((row._1._1, 

row._1._2, row._1._3.map(x => x match {  case str: String => str  case _ =>
println("unknown data type " + x + " : 

"); new String("")}).toArray), row._2))

dataRdd = dataRdd.partitionBy(new
HashPartitioner(64)).persist(StorageLevel.MEMORY_ONLY_SER)

dataRdd.count()
/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to