if anyone has an example that would be really helpful :) On Wed Nov 05 2014 at 3:33:51 PM Nick Beenham <[email protected]> wrote:
> Thanks Nathan, I'm moving forward again. Though I'm working out how to > stop them emitting to all the bolts and doing some proper routing :) > > On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote: > >> I think CSG needs to use the appropriate declare method: https://storm. >> incubator.apache.org/apidocs/backtype/storm/topology/ >> OutputFieldsDeclarer.html >> >> On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]> >> wrote: >> >>> Thanks Nathan. The purpose is to do some rudimentary routing, I receive >>> messages of various types and need to treat them differently. >>> >>> When I follow your example: >>> //Bolts for message routing >>> builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT, >>> "ODU"); >>> >>> I get the error >>> 328 [main] WARN backtype.storm.StormSubmitter - Topology submission >>> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of >>> component [CSG] >>> Exception in thread "main" InvalidTopologyException(msg:Component: >>> [ODU] subscribes from non-existent stream: [ODU] of component [CSG]) >>> >>> Am I missing another piece? >>> >>> Nick >>> >>> >>> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote: >>> >>>> Your emit is on stream "ODU", but when you subscribed you did not >>>> specify any streams (hence the message saying [ODU] subscribes from stream >>>> [default] of [CSG]). You need to do the emit without a stream, e.g. >>>> emit(tuple.getValues()), which will use the default stream. You only need >>>> to specify the stream in the emit if you subscribe to a specific stream >>>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new >>>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit >>>> on this stream you would do emit("<some stream>", tuple.getValues());. >>>> >>>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I'm getting an invalid topology error when trying to emit from one >>>>> bolt to another. >>>>> >>>>> From the create topology method: >>>>> >>>>> private static String ODU_BOLT = "ODU"; >>>>> >>>>> TopologyBuilder builder = new TopologyBuilder(); >>>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1); >>>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()). >>>>> shuffleGrouping("msgs"); >>>>> //Bolts for message routing >>>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT); >>>>> >>>>> The emit method inside the CSG_BOLT >>>>> >>>>> outputCollector.emit("ODU", tuple.getValues()); >>>>> >>>>> But I'm getting the below error... >>>>> >>>>> 331 [main] WARN backtype.storm.StormSubmitter - Topology submission >>>>> exception: Component: [ODU] subscribes from non-existent stream: [default] >>>>> of component [CSG] >>>>> Exception in thread "main" InvalidTopologyException(msg:Component: >>>>> [ODU] subscribes from non-existent stream: [default] of component [CSG]) >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Nick >>>>> >>>> >>>> >>
