if anyone has an example that would be really helpful :)

On Wed Nov 05 2014 at 3:33:51 PM Nick Beenham <[email protected]>
wrote:

> Thanks Nathan, I'm moving forward again. Though I'm working out how to
> stop them emitting to all the bolts and doing some proper routing :)
>
> On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote:
>
>> I think CSG needs to use the appropriate declare method: https://storm.
>> incubator.apache.org/apidocs/backtype/storm/topology/
>> OutputFieldsDeclarer.html
>>
>> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]>
>> wrote:
>>
>>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive
>>> messages of various types and need to treat them differently.
>>>
>>> When I follow your example:
>>> //Bolts for message routing
>>>         builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT,
>>> "ODU");
>>>
>>> I get the error
>>> 328  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of
>>> component [CSG]
>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG])
>>>
>>> Am I missing another piece?
>>>
>>> Nick
>>>
>>>
>>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote:
>>>
>>>> Your emit is on stream "ODU", but when you subscribed you did not
>>>> specify any streams (hence the message saying [ODU] subscribes from stream
>>>> [default] of [CSG]).  You need to do the emit without a stream, e.g.
>>>> emit(tuple.getValues()), which will use the default stream.  You only need
>>>> to specify the stream in the emit if you subscribe to a specific stream
>>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
>>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit
>>>> on this stream you would do emit("<some stream>", tuple.getValues());.
>>>>
>>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm getting an invalid topology error when trying to emit from one
>>>>> bolt to another.
>>>>>
>>>>> From the create topology method:
>>>>>
>>>>> private static String ODU_BOLT = "ODU";
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
>>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).
>>>>> shuffleGrouping("msgs");
>>>>> //Bolts for message routing
>>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>>>>>
>>>>> The emit method inside the CSG_BOLT
>>>>>
>>>>> outputCollector.emit("ODU", tuple.getValues());
>>>>>
>>>>> But I'm getting the below error...
>>>>>
>>>>> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
>>>>> exception: Component: [ODU] subscribes from non-existent stream: [default]
>>>>> of component [CSG]
>>>>> Exception in thread "main" InvalidTopologyException(msg:Component:
>>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG])
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>>
>>

Reply via email to