It looks like you are trying to use the RDD in a distributed operation, which won't work. The context will be null. On Jan 21, 2015 1:50 PM, "Luis Ángel Vicente Sánchez" < [email protected]> wrote:
> The SparkContext is lost when I call the persist function from the sink > function, just before the function call... everything works as intended so > I guess is the FunctionN class serialisation what it's causing the problem. > I will try to embed the functionality in the sink method to verify that. > > 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez < > [email protected]>: > >> The following functions, >> >> def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform), >> HLL))]): Unit = { >> data.foreachRDD { rdd => >> rdd.cache() >> val (minTime, maxTime): (Long, Long) = >> rdd.map { >> case (_, ((TimeSeriesKey(_, time), _), _)) => (time, time) >> }.fold((Long.MaxValue, Long.MinValue)) { >> case ((min, max), (num, _)) => (math.min(min, num), >> math.max(max, num)) >> } >> if (minTime != Long.MaxValue && maxTime != Long.MinValue) { >> rdd.map(_._1).distinct().foreach { >> case (game, category) => persist(game, category, minTime, >> maxTime, rdd) >> } >> } >> rdd.unpersist(blocking = false) >> } >> } >> >> def persist(game: GameID, category: Category, min: Long, max: Long, >> data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = >> { >> val family: String = s"${parameters.table.family}_$ >> {game.repr}_${category.repr}" >> val cas: CassandraRDD[(Long, Long, String, Array[Byte])] = >> data.sparkContext.cassandraTable[(Long, Long, String, >> Array[Byte])](parameters.table.keyspace, family) >> val fil: RDD[((TimeSeriesKey, Platform), HLL)] = >> cas >> .where(""""time" >= ?""", new Date(min)) >> .where(""""time" <= ?""", new Date(max)) >> .map { >> case (date, time, platform, array) => ((TimeSeriesKey(date, >> time), Platform(platform)), HyperLogLog.fromBytes(array)) >> } >> data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map >> { >> case ((key, platform), (value, maybe)) => >> (key.date, key.time, platform.repr, >> HyperLogLog.toBytes(maybe.fold(value)(array >> => value + array))) >> }.saveToCassandra(parameters.table.keyspace, family) >> } >> >> are causing this exception at runtime: >> >> 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 >> (TID 126) >> java.lang.NullPointerException >> at com.datastax.spark.connector.SparkContextFunctions. >> cassandraTable$default$3(SparkContextFunctions.scala:47) >> at com.mindcandy.services.mako.concurrentusers. >> ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51) >> at com.mindcandy.services.mako.concurrentusers. >> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( >> ActiveUsersJobImpl.scala:41) >> at com.mindcandy.services.mako.concurrentusers. >> ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply( >> ActiveUsersJobImpl.scala:40) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach( >> Iterator.scala:1157) >> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. >> scala:759) >> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD. >> scala:759) >> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( >> SparkContext.scala:1143) >> at org.apache.spark.SparkContext$$anonfun$runJob$4.apply( >> SparkContext.scala:1143) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. >> scala:62) >> at org.apache.spark.scheduler.Task.run(Task.scala:54) >> at org.apache.spark.executor.Executor$TaskRunner.run( >> Executor.scala:178) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1145) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of >> SparkContextFunctions.scala is the implicit CassandraConnector that uses >> the underlying spark context to retrieve the SparkConf. >> >> After a few hours debugging the code, the source of the problem is that, >> >> data.sparkContext >> >> is returning null. It seems that the RDD is serialised and the >> SparkContext is lost. Is this the expected behaviour? Is a known bug? >> >> I have ran out of ideas on how to make this work so I'm open to >> suggestions. >> >> Kind regards, >> >> Luis >> > >
