Yep that's it Nathan, I think I have my solution.
I have ...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
//builder.setBolt("print", new TestBolt()).shuffleGrouping("msgs");
builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
//Bolts for message routing
builder.setBolt("ODU", new ODUBolt()).fieldsGrouping(CSG_BOLT,
"ODU", new Fields("ODU"));
builder.setBolt(ADU_BOLT, new
ADUBolt()).fieldsGrouping(CSG_BOLT, "ADU", new Fields("ADU"));
builder.setBolt(ASNS_BOLT, new
ASNSBolt()).fieldsGrouping(CSG_BOLT, "ASNS", new Fields("ASNS"));
and within the bolt:
@Override
public void execute(Tuple tuple, BasicOutputCollector outputCollector) {
System.out.println("CSG Message received... ");
byte[] data = tuple.getBinaryByField("bytes");
//
System.out.println("CSG Message length is: " + data.length);
ReflectData reflectData = ReflectData.AllowNull.get();
Schema schema = reflectData.getSchema(Msg.class);
DatumReader<Msg> reader = new ReflectDatumReader<Msg>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data,
null);
try {
Msg msg = reader.read(null, decoder);
System.out.println(msg.toString());
route = this.getRoute(msg);
if(route == ORDERDETAILUPDATED){
outputCollector.emit("ODU", tuple.getValues());
}
else if(route == ACCOUNTDETAILUPDATED){
outputCollector.emit("ADU", tuple.getValues());
}else if (route == ASNSent){
outputCollector.emit("ASNS", tuple.getValues());
}
else {
System.out.println("Message doesn't match a
known type!");
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputDeclarer) {
outputDeclarer.declareStream("ODU", new Fields("ODU"));
outputDeclarer.declareStream("ADU", new Fields("ADU"));
outputDeclarer.declareStream("ASNS", new Fields("ASNS"));
}
As far as I can see it does the job. Let me know if you see something
horrible.
Nick
On Wed Nov 05 2014 at 4:33:50 PM Nathan Leung <[email protected]> wrote:
> If you mean a situation where bolt a is subscribed to by bolts b and c,
> and you want certain tuples to go to b and others to c, streams are again
> your answer.
> On Nov 5, 2014 3:34 PM, "Nick Beenham" <[email protected]> wrote:
>
>> Thanks Nathan, I'm moving forward again. Though I'm working out how to
>> stop them emitting to all the bolts and doing some proper routing :)
>>
>> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote:
>>
>>> I think CSG needs to use the appropriate declare method: https://storm.
>>> incubator.apache.org/apidocs/backtype/storm/topology/
>>> OutputFieldsDeclarer.html
>>>
>>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]>
>>> wrote:
>>>
>>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>>> messages of various types and need to treat them differently.
>>>>
>>>> When I follow your example:
>>>> //Bolts for message routing
>>>> builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>>> "ODU");
>>>>
>>>> I get the error
>>>> 328 [main] WARN backtype.storm.StormSubmitter - Topology submission
>>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>>> component [CSG]
>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG])
>>>>
>>>> Am I missing another piece?
>>>>
>>>> Nick
>>>>
>>>>
>>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>>> [default] of [CSG]). You need to do the emit without a stream, e.g.
>>>>> emit(tuple.getValues()), which will use the default stream. You only need
>>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>>
>>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>>> bolt to another.
>>>>>>
>>>>>> From the create topology method:
>>>>>>
>>>>>> private static String ODU_BOLT = "ODU";
>>>>>>
>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).
>>>>>> shuffleGrouping("msgs");
>>>>>> //Bolts for message routing
>>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>>
>>>>>> The emit method inside the CSG_BOLT
>>>>>>
>>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>>
>>>>>> But I'm getting the below error...
>>>>>>
>>>>>> 331 [main] WARN backtype.storm.StormSubmitter - Topology submission
>>>>>> exception: Component: [ODU] subscribes from non-existent stream:
>>>>>> [default]
>>>>>> of component [CSG]
>>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>
>>>>>
>>>