Thanks Nathan, I'm moving forward again. Though I'm working out how to stop them emitting to all the bolts and doing some proper routing :)
On Wed Nov 05 2014 at 3:07:00 PM Nathan Leung <[email protected]> wrote: > I think CSG needs to use the appropriate declare method: > https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html > > On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]> > wrote: > >> Thanks Nathan. The purpose is to do some rudimentary routing, I receive >> messages of various types and need to treat them differently. >> >> When I follow your example: >> //Bolts for message routing >> builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT, >> "ODU"); >> >> I get the error >> 328 [main] WARN backtype.storm.StormSubmitter - Topology submission >> exception: Component: [ODU] subscribes from non-existent stream: [ODU] of >> component [CSG] >> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU] >> subscribes from non-existent stream: [ODU] of component [CSG]) >> >> Am I missing another piece? >> >> Nick >> >> >> On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote: >> >>> Your emit is on stream "ODU", but when you subscribed you did not >>> specify any streams (hence the message saying [ODU] subscribes from stream >>> [default] of [CSG]). You need to do the emit without a stream, e.g. >>> emit(tuple.getValues()), which will use the default stream. You only need >>> to specify the stream in the emit if you subscribe to a specific stream >>> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new >>> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on >>> this stream you would do emit("<some stream>", tuple.getValues());. >>> >>> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]> >>> wrote: >>> >>>> Hi All, >>>> >>>> I'm getting an invalid topology error when trying to emit from one bolt >>>> to another. >>>> >>>> From the create topology method: >>>> >>>> private static String ODU_BOLT = "ODU"; >>>> >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1); >>>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs"); >>>> //Bolts for message routing >>>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT); >>>> >>>> The emit method inside the CSG_BOLT >>>> >>>> outputCollector.emit("ODU", tuple.getValues()); >>>> >>>> But I'm getting the below error... >>>> >>>> 331 [main] WARN backtype.storm.StormSubmitter - Topology submission >>>> exception: Component: [ODU] subscribes from non-existent stream: [default] >>>> of component [CSG] >>>> Exception in thread "main" InvalidTopologyException(msg:Component: >>>> [ODU] subscribes from non-existent stream: [default] of component [CSG]) >>>> >>>> >>>> Regards, >>>> >>>> Nick >>>> >>> >>> >
