I had same problem.
The solution, I've found was to use:
JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate('checkpoint_dir', contextFactory);

ALL configuration should be performed inside contextFactory. If you try to configure streamContext after ::getOrCreate, you receive an error "has not been initialized".

On 13.05.2015 00:51, Ankur Chauhan wrote:
Hi,

I have a simple application which fails with the following exception only when 
the application is restarted (i.e. the checkpointDir has entires from a 
previous execution):

Exception in thread "main" org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized
        at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
        at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
        at 
com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.scala:115)
        at 
com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:15)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
        at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
        at com.brightcove.analytics.tacoma.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The relavant source is:

class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: 
Map[String, String]) {
   // create kafka stream
   val rawlogDStream = KafkaUtils.createDirectStream[String, Object, 
StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
   //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, 
kafkaParams, Map("qa-rawlogs" -> 10), StorageLevel.MEMORY_AND_DISK_2)

   val eventStream = rawlogDStream
     .map({
       case (key, rawlogVal) =>
         val record = rawlogVal.asInstanceOf[GenericData.Record]
         val rlog = RawLog.newBuilder()
           .setId(record.get("id").asInstanceOf[String])
           .setAccount(record.get("account").asInstanceOf[String])
           .setEvent(record.get("event").asInstanceOf[String])
           .setTimestamp(record.get("timestamp").asInstanceOf[Long])
           .setUserAgent(record.get("user_agent").asInstanceOf[String])
           .setParams(record.get("params").asInstanceOf[java.util.Map[String, 
String]])
           .build()
         val norm = Normalizer(rlog)
         (key, rlog.getEvent, norm)
     })

   val videoViewStream = eventStream
     .filter(_._2 == "video_view")
     .filter(_._3.isDefined)
     .map((z) => (z._1, z._3.get))
     .map((z) => (z._1, z._2.asInstanceOf[VideoView]))
     .cache()

   // repartition by (deviceType, DeviceOS)
   val deviceTypeVideoViews = videoViewStream.map((v) => ((v._2.getDeviceType, 
v._2.getDeviceOs), 1))
     .reduceByKeyAndWindow(_ + _, Durations.seconds(10))
     .print()
}

object RawLogProcessor extends Logging {

   /**
    * If str is surrounded by quotes it return the content between the quotes
    */
   def unquote(str: String) = {
     if (str != null && str.length >= 2 && str.charAt(0) == '\"' && 
str.charAt(str.length - 1) == '\"')
       str.substring(1, str.length - 1)
     else
       str
   }

   val checkpointDir = "/tmp/checkpointDir_tacoma"
   var sparkConfig: Config = _
   var ssc: StreamingContext = _
   var processor: Option[RawLogProcessor] = None

   val createContext: () => StreamingContext = () => {
     val batchDurationSecs = 
sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
     val sparkConf = new SparkConf()
     sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], 
classOf[VideoEngagement], classOf[VideoImpression]))
     sparkConfig.entrySet.asScala
       .map(kv => kv.getKey -> kv.getValue)
       .foreach {
         case (k, v) =>
           val value = unquote(v.render())

           logInfo(s"spark.$k = $value")

           sparkConf.set(s"spark.$k", value)
       }

     // calculate sparkContext and streamingContext
     new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs))
   }

   def createProcessor(sparkConf: Config, kafkaConf: Config): RawLogProcessor = 
{
     sparkConfig = sparkConf
     ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir, 
creatingFunc = createContext, createOnError = true)
     ssc.checkpoint(checkpointDir)
     // kafkaProperties
     val kafkaParams = kafkaConf.entrySet.asScala
       .map(kv => kv.getKey -> unquote(kv.getValue.render()))
       .toMap

     logInfo(s"Initializing kafkaParams = $kafkaParams")
     // create processor
     new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"), kafkaParams)
   }

   def apply(sparkConfig: Config, kafkaConf: Config) = {
     if (processor.isEmpty) {
       processor = Some(createProcessor(sparkConfig, kafkaConf))
     }
     processor.get
   }

   def start() = {
     ssc.start()
     ssc.awaitTermination()
   }

}

Extended logs: 
https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec96b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log

Could someone tell me what it causes this problem? I tried looking at the 
stacktrace but I am not very familiar with the codebase to make solid 
assertions.
Any ideas as to what may be happening here.

--- Ankur Chauhan

Reply via email to