Just out of curiosity I will like to know why a streaming program should shutdown when no new data is arriving? I think it should keep waiting for arrival of new records.
Thanks Ashutosh On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat <hemant9...@gmail.com> wrote: > A guess - parseRecord is returning None in some case (probaly empty > lines). And then entry.get is throwing the exception. > > You may want to filter the None values from accessLogDStream before you > run the map function over it. > > Hemant > > Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811> > www.snappydata.io > > On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Which line is line 42 in your code ? >> >> When variable lines becomes empty, you can stop your program. >> >> Cheers >> >> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote: >> >> I am working on Spark Streaming API and I wish to stream a set of >> pre-downloaded web log files continuously to simulate a real-time stream. I >> wrote a script that gunzips the compressed logs and pipes the output to nc >> on port 7777. >> >> The script looks like this: >> >> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive >> zipped_files=`find $BASEDIR -name "*.gz"` >> >> for zfile in $zipped_files >> do >> echo "Unzipping $zfile..." >> gunzip -c $zfile | nc -l -p 7777 -q 20 >> >> done >> >> I have streaming code written in Scala that processes the streams. It >> works well for the most part, but when its run out of files to stream I get >> the following error in Spark: >> >> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: >> Restarting receiver with delay 2000 ms: Socket data stream had no more data >> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: >> Restarting receiver with delay 2000ms: Socket data stream had no more data >> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated >> to only 0 peer(s) instead of 1 peers >> .... >> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID >> 47) >> java.util.NoSuchElementException: None.get >> at scala.None$.get(Option.scala:313) >> at scala.None$.get(Option.scala:311) >> at >> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) >> at >> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42) >> >> How to I implement a graceful shutdown so that the program exits >> gracefully when it no longer detects any data in the stream ? >> >> My Spark Streaming code looks like this: >> >> object StreamingLogEnhanced { >> def main(args: Array[String]) { >> val master = args(0) >> val conf = new >> SparkConf().setMaster(master).setAppName("StreamingLogEnhanced") >> // Create a StreamingContext with a n second batch size >> val ssc = new StreamingContext(conf, Seconds(10)) >> // Create a DStream from all the input on port 7777 >> val log = Logger.getLogger(getClass.getName) >> >> sys.ShutdownHookThread { >> log.info("Gracefully stopping Spark Streaming Application") >> ssc.stop(true, true) >> log.info("Application stopped") >> } >> val lines = ssc.socketTextStream("localhost", 7777) >> // Create a count of log hits by ip >> var ipCounts=countByIp(lines) >> ipCounts.print() >> >> // start our streaming context and wait for it to "finish" >> ssc.start() >> // Wait for 600 seconds then exit >> ssc.awaitTermination(10000*600) >> ssc.stop() >> } >> >> def countByIp(lines: DStream[String]) = { >> val parser = new AccessLogParser >> val accessLogDStream = lines.map(line => parser.parseRecord(line)) >> val ipDStream = accessLogDStream.map(entry => >> (entry.get.clientIpAddress, 1)) >> ipDStream.reduceByKey((x, y) => x + y) >> } >> >> } >> >> Thanks for any suggestions in advance. >> >> >