Thanks Nathan. The purpose is to do some rudimentary routing, I receive
messages of various types and need to treat them differently.

When I follow your example:
//Bolts for message routing
        builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
"ODU");

I get the error
328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
component [CSG]
Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
subscribes from non-existent stream: [ODU] of component [CSG])

Am I missing another piece?

Nick


On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote:

> Your emit is on stream "ODU", but when you subscribed you did not specify
> any streams (hence the message saying [ODU] subscribes from stream
> [default] of [CSG]).  You need to do the emit without a stream, e.g.
> emit(tuple.getValues()), which will use the default stream.  You only need
> to specify the stream in the emit if you subscribe to a specific stream
> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
> this stream you would do emit("<some stream>", tuple.getValues());.
>
> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
> wrote:
>
>> Hi All,
>>
>> I'm getting an invalid topology error when trying to emit from one bolt
>> to another.
>>
>> From the create topology method:
>>
>> private static String ODU_BOLT = "ODU";
>>
>> TopologyBuilder builder = new TopologyBuilder();
>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>> //Bolts for message routing
>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>
>> The emit method inside the CSG_BOLT
>>
>> outputCollector.emit("ODU", tuple.getValues());
>>
>> But I'm getting the below error...
>>
>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>> of component [CSG]
>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>> subscribes from non-existent stream: [default] of component [CSG])
>>
>>
>> Regards,
>>
>> Nick
>>
>
>

Reply via email to