Hi all,
if you don't want to read the wall of text below, in short, I want to
know if it is possible to get a superstep-based iteration on a possibly
unbounded DataStream in Flink in an efficient way and what general
concept(s) of synchronization you would suggest for that.
I would like to write a program that has different vertices (realized
just as Longs for now) in a graph which all store a keyed state and
communicate with each other with messages that arrive in an iterated
stream.
From the outside I would only get the messages to add (or, possibly in
the future, delete, however that can be ignored for now) a certain
vertex with some possible additional information specified by the
program (this message can be assumed to have the same form as any other
message) and then the rest would happen through an iterated stream keyed
by the vertex to which the message is adressed in which a vertex through
a KeyedProcessFunction (or KeyedBroadcastProcessFunction if a
BroadcastStream is used for synchronization) can send new messages to
any other vertex (or itself) based on the received message(s) and its
own current state and can also update its state based on the received
message(s). The new messages would then be fed back into the iterated
stream. If no synchronization is done this works quite well, however it
doesn't produce helpful results for my problem since no order in which
the messages arrive can be guaranteed.
What I would optimally like to have is a pregel-like superstep-based
iteration which runs on a batch of outside messages (here: vertex
additions) until no more messages are produced and after that repeats
that with the next batch of vertices either infinitely or until there
are no more new messages received. During the execution of each batch
all vertices (including older ones) can be activated again by receiving
a message and the state of each vertex should be preserved throughout
the execution of the program. The problem lies in how I can seperate the
messages into supersteps in an iterative partitioned stream similar to
the iterations in the DataSetAPI.
One idea I had was just making tumbling windows of a large enough amount
of time which would just collect all the messages and then emit them in
a ProcessWindowFunction once the window fires. While this would be quite
a simple solution that requires little non-parallel synchonization and
it would obviously require that we know such a time in which we can be
guaranteed that all messages have been processed and all new messages
for the next superstep produced which is realistically not the case. It
would also mean that in most supersteps the program would wait longer
than necessary until it starts the next superstep. Fault tolerance would
also be very hard to achieve.
Another more complex idea was to just globally synchronize with an
object that remembers which vertices have been sent messages in the
previous superstep by being informed before any message is sent and then
is also informed when a vertex is done with processing a message and
informs the vertex if there globally are no more messages to be
processed. If that is the case the vertex then sends a NextSuperstep
message which is broadcast to all partitions with a BroadcastStream.
After that all vertices can start with processing all messages sent to
them in the previous superstep. Other than not being trivially to
synchronize without any problems (which I'm working on myself) this
approach has the obvious disadvantage that a lot of information has to
be passed to this object in a globally synchronized manner which kind of
kills the point of parallel processing. Although it is obvious that some
global synchronization probably has to take place this approach seems
rather ineffective to me.
Since I haven't been working with flink for very long, although I have
intensively used it for the past couple of weeks and read all releveant
documentation I could find, I would like to ask if someone has a
suggestion how to implement such a superstep-based iteration in the
DataStreamAPI in the most efficient way with Flink and if you think this
is actually even a worthwhile endeavor. I would especially like to know
if Flink already provides classes, methods or concepts that would be
helpful for that.
Since our project isn't really close to a finished program yet and
consists mainly of various test programs, I cannot really show you a
complete code of what I already have, but if anyone has any specific
questions I probably can send you a pseudocode or a java code of one of
these test programs to describe what I imagine. Also, since we are still
relatively open on how exactly we want to solve our original problem,
I'm also open to suggestions which solve only a similar problem, even
if they don't fully fit what I described above.
It's of course also possible that there is already a simple solution in
Flink which I somehow manged to overlook until now. In that case I'm
sorry for bothering you but I would still like to know what I should
look up.
Best, Christian
- Superstep-like synchronization of streaming iteration Christian Lehner
-