Hello, I am trying to implement something like "process a stream for N seconds, then return a result" with Spark Streaming (built from git head). My approach (which is probably not very elegant) is
val ssc = new StreamingContext(...) ssc.start() future { Thread.sleep(Seconds(N)) ssc.stop(true) } ssc.awaitTermination() and in fact, this stops the stream processing. However, I get the following error messages: 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found (where localhost:9999 is the source I am reading the stream from). This doesn't actually seem like the proper way to do it. Can anyone point me to how to implement "stop after N seconds" without these error messages? Thanks Tobias