Thanks. I'll try that. On Sun, 8 Oct 2017 at 13:40 Stig Rohde Døssing <[email protected]> wrote:
> Yes, though you might want to use collector.emit(streamid, tuple, value) > instead of collector.emit(streamid, value) so Storm keeps tracking whether > the tuple is acked downstream. > > Also you might consider extending > https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html > instead of BaseRichBolt for bolts that just receive a tuple, do some > processing and immediately emit one or more new tuples. BaseBasicBolt takes > care of acking and anchoring for you, so it can make your code a little > more simple. > > 2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <[email protected]> > : > >> I do have a follow-up question after the question on Stackoverflow. >> Do I need a condition in my bolt for two different streams? >> >> For example >> >> if (tople.get("_c").equals("a")) { >> collector.emit("stream1", new Values("field1Value")); >> } else { >> collector.emit("stream12", new Values("field1Value")); >> } >> >> >> On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai < >> [email protected]> wrote: >> >>> Thanks for the reply. >>> >>> Yes that's what I wanted to achieve. >>> >>> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <[email protected]> wrote: >>> >>>> Hi Noppanit, >>>> >>>> Just to make sure we're talking about the same thing, you want to send >>>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b" >>>> to BoltTwo, where BoltOne and BoltTwo are two different bolt >>>> implementations. >>>> >>>> You need to make your bolt define multiple output streams, e.g. >>>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and >>>> _c="b" to "stream2". When you build your topology, you then make BoltOne >>>> listen to "stream1" and BoltTwo listen to "stream2". >>>> >>>> There's some good example code here >>>> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm >>>> >>>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai < >>>> [email protected]>: >>>> >>>>> Hi, >>>>> >>>>> I have a Storm cluster connecting to Kinesis Stream. The message looks >>>>> like this. >>>>> >>>>> { >>>>> _c: "a" >>>>> } >>>>> >>>>> or it should be >>>>> >>>>> { >>>>> _c: "b" >>>>> } >>>>> >>>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a >>>>> different bolt. How do I achieve this? >>>>> >>>>> This is the bolt that parsing the message from Kinesis to JSON Object >>>>> using GSon >>>>> >>>>> @Override >>>>> public void execute(Tuple tuple) { >>>>> String partitionKey = (String) >>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); >>>>> String sequenceNumber = (String) >>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); >>>>> byte[] payload = (byte[]) >>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); >>>>> >>>>> ByteBuffer buffer = ByteBuffer.wrap(payload); >>>>> String data = null; >>>>> try { >>>>> data = decoder.decode(buffer).toString(); >>>>> >>>>> HashMap < String, String > map = new Gson().fromJson(data, new >>>>> TypeToken < HashMap < String, Object >> () {}.getType()); >>>>> >>>>> this.outputCollector.emit(tuple, new Values(map)); >>>>> this.outputCollector.ack(tuple); >>>>> >>>>> } catch (CharacterCodingException e) { >>>>> this.outputCollector.fail(tuple); >>>>> } >>>>> >>>>> } >>>>> >>>>> Thanks >>>>> >>>> >>>> >
