I have been experimenting with a data set with and without persisting the RDD
and have come across some unexpected results.  The files we are reading are
Avro files so we are using the following to define the RDD, what we end up
with is a RDD[CleansedLogFormat]:

 val f = new NewHadoopRDD(sc,
      classOf[AvroKeyInputFormat[CleansedLogFormat]],
      classOf[AvroKey[CleansedLogFormat]],
      classOf[NullWritable],
      job.getConfiguration).map(_._1.datum())

f.count()
=> 110268763

f.persist(StorageLevel.MEMORY_AND_DISK).count()
=> 110268763

So far so good.  Both the persisted and non-persisted RDDs return the same
results for the count.  Where things get weird is when I try and do some
reduce by key or other "grouping" operations.  Something like:

f.map(record => (record.getProviderId.toString,
record)).join(bandwidthKv).map { pair =>
        val hour = new
DateTime(pair._2._1.getTimeStamp).toString("YYYYMMddHH")
        (hour, Set(pair._2._1.getGuid))
      }.reduceByKey(_ ++ _).collect().foreach { a => println(a._1 + ": " +
a._2.size)}


We then get different results from the non-persisted vs. the persisted
version.

Non-persisted:
2014050917: 7
2014050918: 42

Persisted:
2014050917: 7
2014050918: 12

Any idea what could account for the differences?  BTW I am using Spark
0.9.1.

Thanks,

Paul 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unexpected-results-when-caching-data-tp5619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to