In training bolt:
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
// lots of stuff here
outputCollector.emit("updated model", new Values(model));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
{
outputFieldsDeclarer.declareStream("updated model", new
Fields("model"));
}
In your topology config
builder.setBolt(BLAH BLAH)
.shuffleGrouping(your normal stuff)
.shuffleGrouping(your training bolt, "updated model");
In your bolt that takes both in
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
if (tuple.getSourceStreamId() == "updated model") {
} else {
}
}
Is one way to go about it.
Note, I just typed this code in, I can't guarantee it will work but its one
way to do it.
Or you could ditch the steam and just check source component for the name
of your training bolt.
That can be more fragile than streams though if you change components
around.
On Tue, Feb 25, 2014 at 7:50 AM, Klausen Schaefersinho <
[email protected]> wrote:
> Hi,
>
> would do you mean by sending in in another stream? Can you give my an
> example?
>
>
>
> On Mon, Feb 24, 2014 at 5:16 PM, Dilum Ranatunga <[email protected]
> > wrote:
>
>> Do
>>
>> - Tuple.getSourceComponent(),
>> - Tuple.getSourceTask(),
>> - Tuple.getSourceStreamId()
>>
>> do the trick?
>>
>>
>> http://storm.incubator.apache.org/apidocs/backtype/storm/tuple/Tuple.html#getSourceComponent()
>>
>> I would send the model on a different stream.
>>
>>
>> On Mon, Feb 24, 2014 at 8:13 AM, Klausen Schaefersinho <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I have a topology which process events and aggregates them in some form
>>> and performs some prediction based on a machine learning (ML) model. Every
>>> x events the one of the bolt involved in the normal processing emit an
>>> "trainModel" event, which is routed to a bolt which is just dedicated to
>>> the training. One the training is done, the new model should be send back
>>> to the prediction bolt. The topology looks like:
>>>
>>>
>>> InputSpout -> AggregationBolt -> PredictionBolt -> OutputBolt
>>> | /\
>>> \/ |
>>> TrainingBolt -------------+
>>>
>>>
>>>
>>> This works all fine, but I am now not sure how I can get the model back.
>>>
>>> My first idea was to connect the normal prediction bolt to the training
>>> bolt, but this does not work, because the input tuple can have ether events
>>> or a model. So I write something like :
>>>
>>> input.getBinaryByField("model");
>>>
>>> An exception is thrown if the bolt was triggered by an event. Is there
>>> way to distinguish from which bolt / stream the current bolt was triggered?
>>>
>>>
>>> Cheers,
>>>
>>> Klaus
>>>
>>>
>>
>
--
Ce n'est pas une signature