Yes, though you might want to use collector.emit(streamid, tuple, value) instead of collector.emit(streamid, value) so Storm keeps tracking whether the tuple is acked downstream.
Also you might consider extending https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html instead of BaseRichBolt for bolts that just receive a tuple, do some processing and immediately emit one or more new tuples. BaseBasicBolt takes care of acking and anchoring for you, so it can make your code a little more simple. 2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <[email protected]>: > I do have a follow-up question after the question on Stackoverflow. > Do I need a condition in my bolt for two different streams? > > For example > > if (tople.get("_c").equals("a")) { > collector.emit("stream1", new Values("field1Value")); > } else { > collector.emit("stream12", new Values("field1Value")); > } > > > On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <[email protected]> > wrote: > >> Thanks for the reply. >> >> Yes that's what I wanted to achieve. >> >> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <[email protected]> wrote: >> >>> Hi Noppanit, >>> >>> Just to make sure we're talking about the same thing, you want to send >>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b" >>> to BoltTwo, where BoltOne and BoltTwo are two different bolt >>> implementations. >>> >>> You need to make your bolt define multiple output streams, e.g. >>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and >>> _c="b" to "stream2". When you build your topology, you then make BoltOne >>> listen to "stream1" and BoltTwo listen to "stream2". >>> >>> There's some good example code here https://stackoverflow.com/ >>> questions/19807395/how-would-i-split-a-stream-in-apache-storm >>> >>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai < >>> [email protected]>: >>> >>>> Hi, >>>> >>>> I have a Storm cluster connecting to Kinesis Stream. The message looks >>>> like this. >>>> >>>> { >>>> _c: "a" >>>> } >>>> >>>> or it should be >>>> >>>> { >>>> _c: "b" >>>> } >>>> >>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a >>>> different bolt. How do I achieve this? >>>> >>>> This is the bolt that parsing the message from Kinesis to JSON Object >>>> using GSon >>>> >>>> @Override >>>> public void execute(Tuple tuple) { >>>> String partitionKey = (String) tuple.getValueByField( >>>> SampleKinesisRecordScheme.FIELD_PARTITION_KEY); >>>> String sequenceNumber = (String) tuple.getValueByField( >>>> SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); >>>> byte[] payload = (byte[]) tuple.getValueByField( >>>> SampleKinesisRecordScheme.FIELD_RECORD_DATA); >>>> >>>> ByteBuffer buffer = ByteBuffer.wrap(payload); >>>> String data = null; >>>> try { >>>> data = decoder.decode(buffer).toString(); >>>> >>>> HashMap < String, String > map = new Gson().fromJson(data, new >>>> TypeToken < HashMap < String, Object >> () {}.getType()); >>>> >>>> this.outputCollector.emit(tuple, new Values(map)); >>>> this.outputCollector.ack(tuple); >>>> >>>> } catch (CharacterCodingException e) { >>>> this.outputCollector.fail(tuple); >>>> } >>>> >>>> } >>>> >>>> Thanks >>>> >>> >>>
