The following functions,
def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
HLL))]): Unit = {
data.foreachRDD { rdd =>
rdd.cache()
val (minTime, maxTime): (Long, Long) =
rdd.map {
case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
}.fold((Long.MaxValue, Long.MinValue)) {
case ((min, max), (num, _)) => (math.min(min, num), math.max(max,
num))
}
if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
rdd.map(_._1).distinct().foreach {
case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}
}
rdd.unpersist(blocking = false)
}
}
def persist(game: GameID, category: Category, min: Long, max: Long, data:
RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = {
val family: String = s"${parameters.table.family}_$
{game.repr}_${category.repr}"
val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
data.sparkContext.cassandraTable[(Long, Long, String,
Array[Byte])](parameters.table.keyspace, family)
val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
cas
.where(""""time" >= ?""", new Date(min))
.where(""""time" <= ?""", new Date(max))
.map {
case (date, time, platform, array) => ((TimeSeriesKey(date,
time), Platform(platform)), HyperLogLog.fromBytes(array))
}
data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
{
case ((key, platform), (value, maybe)) =>
(key.date, key.time, platform.repr,
HyperLogLog.toBytes(maybe.fold(value)(array
=> value + array)))
}.saveToCassandra(parameters.table.keyspace, family)
}
are causing this exception at runtime:
15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
126)
java.lang.NullPointerException
at com.datastax.spark.connector.SparkContextFunctions.
cassandraTable$default$3(SparkContextFunctions.scala:47)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:41)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:40)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
SparkContextFunctions.scala is the implicit CassandraConnector that uses
the underlying spark context to retrieve the SparkConf.
After a few hours debugging the code, the source of the problem is that,
data.sparkContext
is returning null. It seems that the RDD is serialised and the SparkContext
is lost. Is this the expected behaviour? Is a known bug?
I have ran out of ideas on how to make this work so I'm open to
suggestions.
Kind regards,
Luis