I think CSG needs to use the appropriate declare method:
https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html

On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]> wrote:

> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
> messages of various types and need to treat them differently.
>
> When I follow your example:
> //Bolts for message routing
>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
> "ODU");
>
> I get the error
> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
> component [CSG]
> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
> subscribes from non-existent stream: [ODU] of component [CSG])
>
> Am I missing another piece?
>
> Nick
>
>
> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote:
>
>> Your emit is on stream "ODU", but when you subscribed you did not specify
>> any streams (hence the message saying [ODU] subscribes from stream
>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>> emit(tuple.getValues()), which will use the default stream.  You only need
>> to specify the stream in the emit if you subscribe to a specific stream
>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
>> this stream you would do emit("<some stream>", tuple.getValues());.
>>
>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm getting an invalid topology error when trying to emit from one bolt
>>> to another.
>>>
>>> From the create topology method:
>>>
>>> private static String ODU_BOLT = "ODU";
>>>
>>> TopologyBuilder builder = new TopologyBuilder();
>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>>> //Bolts for message routing
>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>
>>> The emit method inside the CSG_BOLT
>>>
>>> outputCollector.emit("ODU", tuple.getValues());
>>>
>>> But I'm getting the below error...
>>>
>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>> of component [CSG]
>>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>>> subscribes from non-existent stream: [default] of component [CSG])
>>>
>>>
>>> Regards,
>>>
>>> Nick
>>>
>>
>>

Reply via email to