Hello,

I am trying to implement something like "process a stream for N
seconds, then return a result" with Spark Streaming (built from git
head). My approach (which is probably not very elegant) is

    val ssc = new StreamingContext(...)
    ssc.start()
    future {
      Thread.sleep(Seconds(N))
      ssc.stop(true)
    }
    ssc.awaitTermination()

and in fact, this stops the stream processing. However, I get the
following error messages:

14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
receiver for stream 0: Stopped by driver
14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
receiver for stream 0: Restarting receiver with delay 2000ms: Retrying
connecting to localhost:9999
14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found

(where localhost:9999 is the source I am reading the stream from).
This doesn't actually seem like the proper way to do it. Can anyone
point me to how to implement "stop after N seconds" without these
error messages?

Thanks
Tobias

Reply via email to