Yep that's it Nathan, I think I have my solution.

I have ...

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);

        //builder.setBolt("print", new TestBolt()).shuffleGrouping("msgs");

        builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");

        //Bolts for message routing

        builder.setBolt("ODU", new ODUBolt()).fieldsGrouping(CSG_BOLT,
"ODU", new Fields("ODU"));
        builder.setBolt(ADU_BOLT, new
ADUBolt()).fieldsGrouping(CSG_BOLT, "ADU", new Fields("ADU"));
        builder.setBolt(ASNS_BOLT, new
ASNSBolt()).fieldsGrouping(CSG_BOLT, "ASNS", new Fields("ASNS"));

and within the bolt:

        @Override
        public void execute(Tuple tuple, BasicOutputCollector outputCollector) {

                System.out.println("CSG Message received... ");
                byte[] data = tuple.getBinaryByField("bytes");          
                //
                System.out.println("CSG Message length is: " + data.length);

                ReflectData reflectData = ReflectData.AllowNull.get();
                Schema schema = reflectData.getSchema(Msg.class);

                DatumReader<Msg> reader = new ReflectDatumReader<Msg>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, 
null);
                try {
                        Msg msg = reader.read(null, decoder);
                        System.out.println(msg.toString());
                        route = this.getRoute(msg);

                        if(route == ORDERDETAILUPDATED){
                                outputCollector.emit("ODU", tuple.getValues());
                        }
                        else if(route == ACCOUNTDETAILUPDATED){
                                outputCollector.emit("ADU", tuple.getValues());
                        }else if (route == ASNSent){
                                outputCollector.emit("ASNS", tuple.getValues());
                        }
                        else {
                                System.out.println("Message doesn't match a 
known type!");
                        }       
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }

        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputDeclarer) {
                outputDeclarer.declareStream("ODU", new Fields("ODU"));
                outputDeclarer.declareStream("ADU", new Fields("ADU"));
                outputDeclarer.declareStream("ASNS", new Fields("ASNS"));
        }


As far as I can see it does the job. Let me know if you see something
horrible.

Nick
On Wed Nov 05 2014 at 4:33:50 PM Nathan Leung <[email protected]> wrote:

> If you mean a situation where bolt a is subscribed to by bolts b and c,
> and you want certain tuples to go to b and others to c, streams are again
> your answer.
> On Nov 5, 2014 3:34 PM, "Nick Beenham" <[email protected]> wrote:
>
>> Thanks Nathan, I'm moving forward again. Though I'm working out how to
>> stop them emitting to all the bolts and doing some proper routing :)
>>
>> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote:
>>
>>> I think CSG needs to use the appropriate declare method: https://storm.
>>> incubator.apache.org/apidocs/backtype/storm/topology/
>>> OutputFieldsDeclarer.html
>>>
>>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]>
>>> wrote:
>>>
>>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>>> messages of various types and need to treat them differently.
>>>>
>>>> When I follow your example:
>>>> //Bolts for message routing
>>>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>>> "ODU");
>>>>
>>>> I get the error
>>>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>>> component [CSG]
>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG])
>>>>
>>>> Am I missing another piece?
>>>>
>>>> Nick
>>>>
>>>>
>>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>>
>>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>>> bolt to another.
>>>>>>
>>>>>> From the create topology method:
>>>>>>
>>>>>> private static String ODU_BOLT = "ODU";
>>>>>>
>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).
>>>>>> shuffleGrouping("msgs");
>>>>>> //Bolts for message routing
>>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>>
>>>>>> The emit method inside the CSG_BOLT
>>>>>>
>>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>>
>>>>>> But I'm getting the below error...
>>>>>>
>>>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>>>> exception: Component: [ODU] subscribes from non-existent stream: 
>>>>>> [default]
>>>>>> of component [CSG]
>>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Nick
>>>>>>
>>>>>
>>>>>
>>>

Reply via email to