Is there a particular reason you're calling checkpoint on the stream in addition to the streaming context?
On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak <oss.mli...@gmail.com> wrote: > Hi all, > it throws FileBasedWriteAheadLogReader: Error reading next item, EOF > reached > java.io.EOFException > at java.io.DataInputStream.readInt(DataInputStream.java:392) > at > org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47) > > WAL is not enabled in config, it is default, hence false. > > The code is by example and quite simple for testing (I'm aware that file > save isn't idempotent). Or do I have something wrong there? It was tried on > Spark 1.5.0. > > object Loader { > def main(args: Array[String]): Unit = { > > val checkpointDir = "/dfs/spark/checkpoints" > > val sparkConf = new SparkConf() > .setAppName("Spark Loader") > .setIfMissing("spark.master", "local[2]") > .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000") > > val ssc = StreamingContext.getOrCreate( > checkpointDir, > createStreamingContext(sparkConf, checkpointDir)) > > ssc.start() > ssc.awaitTermination() > } > > def createStreamingContext(conf: SparkConf, checkpointDir: String)(): > StreamingContext = { > val ssc = new StreamingContext(conf, Seconds(60)) > > val sc = ssc.sparkContext > val sqlc = new SQLContext(sc) > > ssc.checkpoint(checkpointDir) > > import sqlc.implicits._ > > val kafkaParams = Map[String, String]( > "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092", > "auto.offset.reset" -> "smallest") > > val topics = Set("topic-p03-r01") > > val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder]( > ssc, kafkaParams, topics) > > stream > .checkpoint(Seconds(60)) > .foreachRDD { (rdd, time) => > rdd.toDF() > .write > .json(s"/dfs/spark/agg/${time.milliseconds / 1000}") > } > > ssc > } > } > > > Many thanks for any idea, > Petr >