Yes, though you might want to use collector.emit(streamid, tuple, value)
instead of collector.emit(streamid, value) so Storm keeps tracking whether
the tuple is acked downstream.

Also you might consider extending
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html
instead of BaseRichBolt for bolts that just receive a tuple, do some
processing and immediately emit one or more new tuples. BaseBasicBolt takes
care of acking and anchoring for you, so it can make your code a little
more simple.

2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <[email protected]>:

> I do have a follow-up question after the question on Stackoverflow.
> Do I need a condition in my bolt for two different streams?
>
> For example
>
> if (tople.get("_c").equals("a")) {
>   collector.emit("stream1", new Values("field1Value"));
> } else {
>   collector.emit("stream12", new Values("field1Value"));
> }
>
>
> On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <[email protected]>
> wrote:
>
>> Thanks for the reply.
>>
>> Yes that's what I wanted to achieve.
>>
>> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <[email protected]> wrote:
>>
>>> Hi Noppanit,
>>>
>>> Just to make sure we're talking about the same thing, you want to send
>>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>>> implementations.
>>>
>>> You need to make your bolt define multiple output streams, e.g.
>>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>>> listen to "stream1" and BoltTwo listen to "stream2".
>>>
>>> There's some good example code here https://stackoverflow.com/
>>> questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>>
>>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <
>>> [email protected]>:
>>>
>>>> Hi,
>>>>
>>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>>> like this.
>>>>
>>>> {
>>>> _c: "a"
>>>> }
>>>>
>>>> or it should be
>>>>
>>>> {
>>>> _c: "b"
>>>> }
>>>>
>>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>>> different bolt. How do I achieve this?
>>>>
>>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>>> using GSon
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple) {
>>>>       String partitionKey = (String) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>>       String sequenceNumber = (String) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>>       byte[] payload = (byte[]) tuple.getValueByField(
>>>> SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>>
>>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>>       String data = null;
>>>>       try {
>>>>         data = decoder.decode(buffer).toString();
>>>>
>>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>>
>>>>         this.outputCollector.emit(tuple, new Values(map));
>>>>         this.outputCollector.ack(tuple);
>>>>
>>>>       } catch (CharacterCodingException e) {
>>>>         this.outputCollector.fail(tuple);
>>>>       }
>>>>
>>>>     }
>>>>
>>>> Thanks
>>>>
>>>
>>>

Reply via email to