http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

shows setting up your stream and calling .checkpoint(checkpointDir) inside
the functionToCreateContext.  It looks to me like you're setting up your
stream and calling checkpoint outside, after getOrCreate.

I'm not sure that's the issue (someone who knows checkpoints better than I
do should chime in), but that's the first thing I noticed.

On Wed, May 13, 2015 at 4:06 AM, ankurcha <achau...@brightcove.com> wrote:

> Hi,
>
> I have a simple application which fails with the following exception
> only when the application is restarted (i.e. the checkpointDir has
> entires from a previous execution):
>
> Exception in thread "main" org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not
> been initialized
>         at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266
> )
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
> (DStream.scala:287)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply
> (DStream.scala:287)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28
> 4)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt
> ream.scala:38)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
> ala:116)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc
> ala:116)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
> e.scala:251)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik
> e.scala:251)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
> la:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251
> )
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:
> 116)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
> ly(JobGenerator.scala:227)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app
> ly(JobGenerator.scala:222)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
> cala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s
> cala:222)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca
> la:90)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca
> la:67)
>         at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala
> :512)
>         at
> com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s
> cala:115)
>         at
> com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1
> 5)
>         at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>         at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>         at scala.App$$anonfun$main$1.apply(App.scala:71)
>         at scala.App$$anonfun$main$1.apply(App.scala:71)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableF
> orwarder.scala:32)
>         at scala.App$class.main(App.scala:71)
>         at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5)
>         at com.brightcove.analytics.tacoma.Main.main(Main.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
> a:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
> Impl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit
> $$runMain(SparkSubmit.scala:569)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> The relavant source is:
>
> class RawLogProcessor(ssc: StreamingContext, topic: String,
> kafkaParams: Map[String, String]) {
>  // create kafka stream
>  val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
> StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic))
>  //KafkaUtils.createStream[String, Object, StringDecoder,
> KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10),
> StorageLevel.MEMORY_AND_DISK_2)
>
>  val eventStream = rawlogDStream
>    .map({
>      case (key, rawlogVal) =>
>        val record = rawlogVal.asInstanceOf[GenericData.Record]
>        val rlog = RawLog.newBuilder()
>          .setId(record.get("id").asInstanceOf[String])
>          .setAccount(record.get("account").asInstanceOf[String])
>          .setEvent(record.get("event").asInstanceOf[String])
>          .setTimestamp(record.get("timestamp").asInstanceOf[Long])
>          .setUserAgent(record.get("user_agent").asInstanceOf[String])
>
> .setParams(record.get("params").asInstanceOf[java.util.Map[String,
> String]])
>          .build()
>        val norm = Normalizer(rlog)
>        (key, rlog.getEvent, norm)
>    })
>
>  val videoViewStream = eventStream
>    .filter(_._2 == "video_view")
>    .filter(_._3.isDefined)
>    .map((z) => (z._1, z._3.get))
>    .map((z) => (z._1, z._2.asInstanceOf[VideoView]))
>    .cache()
>
>  // repartition by (deviceType, DeviceOS)
>  val deviceTypeVideoViews = videoViewStream.map((v) =>
> ((v._2.getDeviceType, v._2.getDeviceOs), 1))
>    .reduceByKeyAndWindow(_ + _, Durations.seconds(10))
>    .print()
> }
>
> object RawLogProcessor extends Logging {
>
>  /**
>   * If str is surrounded by quotes it return the content between the
> quotes
>   */
>  def unquote(str: String) = {
>    if (str != null && str.length >= 2 && str.charAt(0) == '\"' &&
> str.charAt(str.length - 1) == '\"')
>      str.substring(1, str.length - 1)
>    else
>      str
>  }
>
>  val checkpointDir = "/tmp/checkpointDir_tacoma"
>  var sparkConfig: Config = _
>  var ssc: StreamingContext = _
>  var processor: Option[RawLogProcessor] = None
>
>  val createContext: () => StreamingContext = () => {
>    val batchDurationSecs =
> sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
>    val sparkConf = new SparkConf()
>    sparkConf.registerKryoClasses(Array(classOf[VideoView],
> classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
>    sparkConfig.entrySet.asScala
>      .map(kv => kv.getKey -> kv.getValue)
>      .foreach {
>        case (k, v) =>
>          val value = unquote(v.render())
>
>          logInfo(s"spark.$k = $value")
>
>          sparkConf.set(s"spark.$k", value)
>      }
>
>    // calculate sparkContext and streamingContext
>    new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs))
>  }
>
>  def createProcessor(sparkConf: Config, kafkaConf: Config):
> RawLogProcessor = {
>    sparkConfig = sparkConf
>    ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir,
> creatingFunc = createContext, createOnError = true)
>    ssc.checkpoint(checkpointDir)
>    // kafkaProperties
>    val kafkaParams = kafkaConf.entrySet.asScala
>      .map(kv => kv.getKey -> unquote(kv.getValue.render()))
>      .toMap
>
>    logInfo(s"Initializing kafkaParams = $kafkaParams")
>    // create processor
>    new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"),
> kafkaParams)
>  }
>
>  def apply(sparkConfig: Config, kafkaConf: Config) = {
>    if (processor.isEmpty) {
>      processor = Some(createProcessor(sparkConfig, kafkaConf))
>    }
>    processor.get
>  }
>
>  def start() = {
>    ssc.start()
>    ssc.awaitTermination()
>  }
>
> }
>
> Extended logs:
> https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9
> 6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log
>
> Could someone tell me what it causes this problem? I tried looking at
> the stacktrace but I am not very familiar with the codebase to make
> solid assertions.
> Any ideas as to what may be happening here.
>
> --- Ankur Chauhan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to