I think CSG needs to use the appropriate declare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/OutputFieldsDeclarer.html
On Wed, Nov 5, 2014 at 3:02 PM, Nick Beenham <[email protected]> wrote: > Thanks Nathan. The purpose is to do some rudimentary routing, I receive > messages of various types and need to treat them differently. > > When I follow your example: > //Bolts for message routing > builder.setBolt("ODU", new ODUBolt()).shuffleGrouping(CSG_BOLT, > "ODU"); > > I get the error > 328 [main] WARN backtype.storm.StormSubmitter - Topology submission > exception: Component: [ODU] subscribes from non-existent stream: [ODU] of > component [CSG] > Exception in thread "main" InvalidTopologyException(msg:Component: [ODU] > subscribes from non-existent stream: [ODU] of component [CSG]) > > Am I missing another piece? > > Nick > > > On Wed Nov 05 2014 at 2:47:06 PM Nathan Leung <[email protected]> wrote: > >> Your emit is on stream "ODU", but when you subscribed you did not specify >> any streams (hence the message saying [ODU] subscribes from stream >> [default] of [CSG]). You need to do the emit without a stream, e.g. >> emit(tuple.getValues()), which will use the default stream. You only need >> to specify the stream in the emit if you subscribe to a specific stream >> when you create the topology, e.g. builder.setBolt(ODU_BOLT, new >> ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on >> this stream you would do emit("<some stream>", tuple.getValues());. >> >> On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]> >> wrote: >> >>> Hi All, >>> >>> I'm getting an invalid topology error when trying to emit from one bolt >>> to another. >>> >>> From the create topology method: >>> >>> private static String ODU_BOLT = "ODU"; >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1); >>> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs"); >>> //Bolts for message routing >>> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT); >>> >>> The emit method inside the CSG_BOLT >>> >>> outputCollector.emit("ODU", tuple.getValues()); >>> >>> But I'm getting the below error... >>> >>> 331 [main] WARN backtype.storm.StormSubmitter - Topology submission >>> exception: Component: [ODU] subscribes from non-existent stream: [default] >>> of component [CSG] >>> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU] >>> subscribes from non-existent stream: [default] of component [CSG]) >>> >>> >>> Regards, >>> >>> Nick >>> >> >>
