There is a heartbeat stream pattern that you can use: Create a service
(perhaps a thread in your driver) that pushes a heartbeat event to a
different stream every N seconds. Consume that stream as well in your
streaming application, and perform an action on every heartbeat.

This has worked well in Storm, which differs in implementation, but it
might work in Spark Streaming as well.

Alternatively, since Spark internally uses microbatching, you might be
able to take advantage of the existing batching. You can try to use
foreachRDD on the stream, and see if your function gets called even
when the microbatch is empty. In that case, assuming your batch size
can add up to 10 seconds, you can implement heartbeat functionality
internally. If you are very lucky,
myStream.window(Seconds(10)).foreachRDD({ ... }) might even do it for
you.

If you do experiments along those lines, please share the results with the list!

Regards,

Lars Albertsson



On Thu, Oct 22, 2015 at 10:48 PM, Nipun Arora <nipunarora2...@gmail.com> wrote:
> Hi,
> In general in spark stream one can do transformations ( filter, map etc.) or
> output operations (collect, forEach) etc. in an event-driven pardigm... i.e.
> the action happens only if a message is received.
>
> Is it possible to do actions every few seconds in a polling based fashion,
> regardless if a new message has been received.
>
> In my use-case, I am filtering out a stream, and then do operations on this
> filter streams. However, I would like to do operations on the data in the
> stream every few seconds even if no message has been received in the stream.
>
> For example I have the following stream, where the violationChecking()
> function is being called only when a micro-batch is finished. Essentially
> this also means that I must receive a message in this stream to do
> processing. Is there any way that I can do the same operation every 10
> seconds or so? :
>
>     sortedMessages.foreach(
>             new Function<JavaRDD<Tuple5<Long, Integer, String, Integer,
> Integer>>, Void>() {
>                 @Override
>                 public Void call(JavaRDD<Tuple5<Long, Integer, String,
> Integer, Integer>> tuple5JavaRDD) throws Exception {
>                     List<Tuple5<Long, Integer, String, Integer, Integer>>
> list = tuple5JavaRDD.collect();
>                     violationChecking(list);
>                     return null;
>                 }
>             }
>     );
>
>
> Thanks,
>
> Nipun

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to