Yes, I have just found that. By replacing,
rdd.map(_._1).distinct().foreach {
case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}
with,
rdd.map(_._1).distinct().collect().foreach {
case (game, category) => persist(game, category, minTime,
maxTime, rdd)
}
everything works as expected.
2015-01-21 14:18 GMT+00:00 Sean Owen <[email protected]>:
> It looks like you are trying to use the RDD in a distributed operation,
> which won't work. The context will be null.
> On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" <
> [email protected]> wrote:
>
>> The SparkContext is lost when I call the persist function from the sink
>> function, just before the function call... everything works as intended so
>> I guess is the FunctionN class serialisation what it's causing the problem.
>> I will try to embed the functionality in the sink method to verify that.
>>
>> 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez <
>> [email protected]>:
>>
>>> The following functions,
>>>
>>> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
>>> HLL))]): Unit = {
>>> data.foreachRDD { rdd =>
>>> rdd.cache()
>>> val (minTime, maxTime): (Long, Long) =
>>> rdd.map {
>>> case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time)
>>> }.fold((Long.MaxValue, Long.MinValue)) {
>>> case ((min, max), (num, _)) => (math.min(min, num),
>>> math.max(max, num))
>>> }
>>> if (minTime != Long.MaxValue && maxTime != Long.MinValue) {
>>> rdd.map(_._1).distinct().foreach {
>>> case (game, category) => persist(game, category, minTime,
>>> maxTime, rdd)
>>> }
>>> }
>>> rdd.unpersist(blocking = false)
>>> }
>>> }
>>>
>>> def persist(game: GameID, category: Category, min: Long, max: Long,
>>> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
>>> {
>>> val family: String = s"${parameters.table.family}_$
>>> {game.repr}_${category.repr}"
>>> val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
>>> data.sparkContext.cassandraTable[(Long, Long, String,
>>> Array[Byte])](parameters.table.keyspace, family)
>>> val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
>>> cas
>>> .where(""""time" >= ?""", new Date(min))
>>> .where(""""time" <= ?""", new Date(max))
>>> .map {
>>> case (date, time, platform, array) => ((TimeSeriesKey(date,
>>> time), Platform(platform)), HyperLogLog.fromBytes(array))
>>> }
>>> data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
>>> {
>>> case ((key, platform), (value, maybe)) =>
>>> (key.date, key.time, platform.repr,
>>> HyperLogLog.toBytes(maybe.fold(value)(array
>>> => value + array)))
>>> }.saveToCassandra(parameters.table.keyspace, family)
>>> }
>>>
>>> are causing this exception at runtime:
>>>
>>> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
>>> (TID 126)
>>> java.lang.NullPointerException
>>> at com.datastax.spark.connector.SparkContextFunctions.
>>> cassandraTable$default$3(SparkContextFunctions.scala:47)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:41)
>>> at com.mindcandy.services.mako.concurrentusers.
>>> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
>>> ActiveUsersJobImpl.scala:40)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(
>>> Iterator.scala:1157)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
>>> scala:759)
>>> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>>> SparkContext.scala:1143)
>>> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
>>> SparkContext.scala:1143)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
>>> scala:62)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(
>>> Executor.scala:178)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1145)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
>>> SparkContextFunctions.scala is the implicit CassandraConnector that uses
>>> the underlying spark context to retrieve the SparkConf.
>>>
>>> After a few hours debugging the code, the source of the problem is that,
>>>
>>> data.sparkContext
>>>
>>> is returning null. It seems that the RDD is serialised and the
>>> SparkContext is lost. Is this the expected behaviour? Is a known bug?
>>>
>>> I have ran out of ideas on how to make this work so I'm open to
>>> suggestions.
>>>
>>> Kind regards,
>>>
>>> Luis
>>>
>>
>>