Thanks Nathan, I'm moving forward again. Though I'm working out how to stop
them emitting to all the bolts and doing some proper routing :)

On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote:

> I think CSG needs to use the appropriate declare method:
> https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html
>
> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]>
> wrote:
>
>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>> messages of various types and need to treat them differently.
>>
>> When I follow your example:
>> //Bolts for message routing
>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>> "ODU");
>>
>> I get the error
>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>> component [CSG]
>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
>> subscribes from non-existent stream: [ODU] of component [CSG])
>>
>> Am I missing another piece?
>>
>> Nick
>>
>>
>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote:
>>
>>> Your emit is on stream "ODU", but when you subscribed you did not
>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>> to specify the stream in the emit if you subscribe to a specific stream
>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
>>> this stream you would do emit("<some stream>", tuple.getValues());.
>>>
>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm getting an invalid topology error when trying to emit from one bolt
>>>> to another.
>>>>
>>>> From the create topology method:
>>>>
>>>> private static String ODU_BOLT = "ODU";
>>>>
>>>> TopologyBuilder builder = new TopologyBuilder();
>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
>>>> //Bolts for message routing
>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>
>>>> The emit method inside the CSG_BOLT
>>>>
>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>
>>>> But I'm getting the below error...
>>>>
>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>> of component [CSG]
>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Nick
>>>>
>>>
>>>
>

Reply via email to