Thanks. I'll try that.

On Sun, 8 Oct 2017 at 13:40 Stig Rohde Døssing <[email protected]> wrote:

> Yes, though you might want to use collector.emit(streamid, tuple, value)
> instead of collector.emit(streamid, value) so Storm keeps tracking whether
> the tuple is acked downstream.
>
> Also you might consider extending
> https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/topology/base/BaseBasicBolt.html
> instead of BaseRichBolt for bolts that just receive a tuple, do some
> processing and immediately emit one or more new tuples. BaseBasicBolt takes
> care of acking and anchoring for you, so it can make your code a little
> more simple.
>
> 2017-10-08 19:35 GMT+02:00 Noppanit Charassinvichai <[email protected]>
> :
>
>> I do have a follow-up question after the question on Stackoverflow.
>> Do I need a condition in my bolt for two different streams?
>>
>> For example
>>
>> if (tople.get("_c").equals("a")) {
>>   collector.emit("stream1", new Values("field1Value"));
>> } else {
>>   collector.emit("stream12", new Values("field1Value"));
>> }
>>
>>
>> On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <
>> [email protected]> wrote:
>>
>>> Thanks for the reply.
>>>
>>> Yes that's what I wanted to achieve.
>>>
>>> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <[email protected]> wrote:
>>>
>>>> Hi Noppanit,
>>>>
>>>> Just to make sure we're talking about the same thing, you want to send
>>>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>>>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>>>> implementations.
>>>>
>>>> You need to make your bolt define multiple output streams, e.g.
>>>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>>>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>>>> listen to "stream1" and BoltTwo listen to "stream2".
>>>>
>>>> There's some good example code here
>>>> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>>>
>>>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <
>>>> [email protected]>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>>>> like this.
>>>>>
>>>>> {
>>>>> _c: "a"
>>>>> }
>>>>>
>>>>> or it should be
>>>>>
>>>>> {
>>>>> _c: "b"
>>>>> }
>>>>>
>>>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>>>> different bolt. How do I achieve this?
>>>>>
>>>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>>>> using GSon
>>>>>
>>>>>     @Override
>>>>>     public void execute(Tuple tuple) {
>>>>>       String partitionKey = (String)
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>>>       String sequenceNumber = (String)
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>>>       byte[] payload = (byte[])
>>>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>>>
>>>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>>>       String data = null;
>>>>>       try {
>>>>>         data = decoder.decode(buffer).toString();
>>>>>
>>>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>>>
>>>>>         this.outputCollector.emit(tuple, new Values(map));
>>>>>         this.outputCollector.ack(tuple);
>>>>>
>>>>>       } catch (CharacterCodingException e) {
>>>>>         this.outputCollector.fail(tuple);
>>>>>       }
>>>>>
>>>>>     }
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>

Reply via email to