Your emit is on stream "ODU", but when you subscribed you did not specify
any streams (hence the message saying [ODU] subscribes from stream
[default] of [CSG]).  You need to do the emit without a stream, e.g.
emit(tuple.getValues()), which will use the default stream.  You only need
to specify the stream in the emit if you subscribe to a specific stream
when you create the topology, e.g. builder.setBolt(ODU_BOLT, new
ODUBolt()).shuffleGrouping(CSG_BOLT, "<some stream>");, then to emit on
this stream you would do emit("<some stream>", tuple.getValues());.

On Wed, Nov 5, 2014 at 2:33 PM, Nick Beenham <[email protected]> wrote:

> Hi All,
>
> I'm getting an invalid topology error when trying to emit from one bolt to
> another.
>
> From the create topology method:
>
> private static String ODU_BOLT = "ODU";
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("msgs", new KafkaSpout(kafkaConfig), 1);
> builder.setBolt(CSG_BOLT, new CSGIntakeBolt()).shuffleGrouping("msgs");
> //Bolts for message routing
> builder.setBolt(ODU_BOLT, new ODUBolt()).shuffleGrouping(CSG_BOLT);
>
> The emit method inside the CSG_BOLT
>
> outputCollector.emit("ODU", tuple.getValues());
>
> But I'm getting the below error...
>
> 331  [main] WARN  backtype.storm.StormSubmitter - Topology submission
> exception: Component: [ODU] subscribes from non-existent stream: [default]
> of component [CSG]
> Exception in thread "main" InvalidTopologyException(msg:Component: [ODU]
> subscribes from non-existent stream: [default] of component [CSG])
>
>
> Regards,
>
> Nick
>

Reply via email to