Is there a particular reason you're calling checkpoint on the stream in
addition to the streaming context?

On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak <oss.mli...@gmail.com> wrote:

> Hi all,
> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF
> reached
> java.io.EOFException
>   at java.io.DataInputStream.readInt(DataInputStream.java:392)
>   at
> org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
>
> WAL is not enabled in config, it is default, hence false.
>
> The code is by example and quite simple for testing (I'm aware that file
> save isn't idempotent). Or do I have something wrong there? It was tried on
> Spark 1.5.0.
>
> object Loader {
>   def main(args: Array[String]): Unit = {
>
>     val checkpointDir = "/dfs/spark/checkpoints"
>
>     val sparkConf = new SparkConf()
>       .setAppName("Spark Loader")
>       .setIfMissing("spark.master", "local[2]")
>       .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")
>
>     val ssc = StreamingContext.getOrCreate(
>       checkpointDir,
>       createStreamingContext(sparkConf, checkpointDir))
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>   def createStreamingContext(conf: SparkConf, checkpointDir: String)(): 
> StreamingContext = {
>     val ssc = new StreamingContext(conf, Seconds(60))
>
>     val sc = ssc.sparkContext
>     val sqlc = new SQLContext(sc)
>
>     ssc.checkpoint(checkpointDir)
>
>     import sqlc.implicits._
>
>     val kafkaParams = Map[String, String](
>       "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
>       "auto.offset.reset" -> "smallest")
>
>     val topics = Set("topic-p03-r01")
>
>     val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>         ssc, kafkaParams, topics)
>
>     stream
>       .checkpoint(Seconds(60))
>       .foreachRDD { (rdd, time) =>
>       rdd.toDF()
>         .write
>         .json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
>     }
>
>     ssc
>   }
> }
>
>
> Many thanks for any idea,
> Petr
>

Reply via email to