Hi,
I would like to implement the following data processing scheme using Storm,
but it seems to me that the only natural way to do this is with a dynamic
topology, which I know Storm doesn't support. Is there a way to do this
with a static topology, or is this use case truly outside of Storm's scope?
*The Goal:*
The basic idea is that I have an input data stream that is processed in a
model. The model would then emit to the downstream topology. As the
topology is running, I would like to instantiate new mutated clones of the
a particular running model that consume the same input data stream as its
parent model. Additionally, I would like to deactivate models when they
are under-performing, based on the feedback from the downstream topology.
This would have the net effect of evolving a set of models overtime, where
the fitness of each model is decided by the part of the topology downstream
of the model's output.
Because the input data stream is expected to be high, as is the processing
time per datum, I would need each model running concurrently and
potentially even parallelizing the individual models themselves.
To me, the most natural way of accomplishing all this without regards to
Storm's limitations would be to encapsulate each model in its own bolt, and
then dynamically generate or destroy those bolts based on messages emitted
from the downstream topology. Such as:
Spout1 ----> {bolt_1, bolt_2, ... bolt_n} ----> BoltF
^ |
\---(*message_2*)-----/
where n is the number of model bolts, and *message_i* is a "replicate" or
"die" signal for bolt_i (in this case 2). When *message_i*=="replicate", I
want a new bolt to be created that executes a mutated clone of the model in
bolt_i. When *message_i*=="die", I want that bolt to be removed from the
topology.
Unless I am mistaken, such a scheme is not supported in Storm.
Two Storm-friendly (...Storm-ified?...Stormy?...) ways of doing this could
be as follows:
*Implementation 1)*
- Designate one bolt that handles the execution of all models, which it
could be fed from another bolt. The model bolt is rebalanced on
"replicate/death" messages, so that numWorkers = numModels. This
(potentially?) could be triggered by the replicate/death message itself,
but I'm not sure if this violates a Storm principle because it would mean a
topology is rebalancing itself.
- Because there is no way to dynamically declare streams in Storm, a
model identifier would have to be included as a part of the emitted tuple
from the model bolt. This would then be used to route the tuple's data to
the routines in BoltF that are responsible for handling data that comes
from that particular model.
*Implementation 2)*
- Require that a topology can run only one model.
- "replicate" and "die" messages must be sent to to a manager that is
external to all topologies. He would then spin up a new topology with the
relevant model configurations.
- Issue 1: Data in the downstream topology (BoltF) needs to be persisted
across all topologies. This could be done with a centralized Redis db, but
this would then become a single-point-of-failure.
- Issue 2: It's unlikely that this could scale well with the number
models. If I wanted to simultaneously run thousands of models, having 1
topology per model would likely be very expensive.
Any ideas are greatly appreciated.
Thanks!
Best,
Bryan