I do have a follow-up question after the question on Stackoverflow.
Do I need a condition in my bolt for two different streams?

For example

if (tople.get("_c").equals("a")) {
  collector.emit("stream1", new Values("field1Value"));
} else {
  collector.emit("stream12", new Values("field1Value"));
}


On Sun, 8 Oct 2017 at 13:28 Noppanit Charassinvichai <[email protected]>
wrote:

> Thanks for the reply.
>
> Yes that's what I wanted to achieve.
>
> On Sun, 8 Oct 2017 at 13:26 Stig Rohde Døssing <[email protected]> wrote:
>
>> Hi Noppanit,
>>
>> Just to make sure we're talking about the same thing, you want to send
>> from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b"
>> to BoltTwo, where BoltOne and BoltTwo are two different bolt
>> implementations.
>>
>> You need to make your bolt define multiple output streams, e.g.
>> "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and
>> _c="b" to "stream2". When you build your topology, you then make BoltOne
>> listen to "stream1" and BoltTwo listen to "stream2".
>>
>> There's some good example code here
>> https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
>>
>> 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <[email protected]
>> >:
>>
>>> Hi,
>>>
>>> I have a Storm cluster connecting to Kinesis Stream. The message looks
>>> like this.
>>>
>>> {
>>> _c: "a"
>>> }
>>>
>>> or it should be
>>>
>>> {
>>> _c: "b"
>>> }
>>>
>>> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
>>> different bolt. How do I achieve this?
>>>
>>> This is the bolt that parsing the message from Kinesis to JSON Object
>>> using GSon
>>>
>>>     @Override
>>>     public void execute(Tuple tuple) {
>>>       String partitionKey = (String)
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>>>       String sequenceNumber = (String)
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>>>       byte[] payload = (byte[])
>>> tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>>>
>>>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>>>       String data = null;
>>>       try {
>>>         data = decoder.decode(buffer).toString();
>>>
>>>         HashMap < String, String > map = new Gson().fromJson(data, new
>>> TypeToken < HashMap < String, Object >> () {}.getType());
>>>
>>>         this.outputCollector.emit(tuple, new Values(map));
>>>         this.outputCollector.ack(tuple);
>>>
>>>       } catch (CharacterCodingException e) {
>>>         this.outputCollector.fail(tuple);
>>>       }
>>>
>>>     }
>>>
>>> Thanks
>>>
>>
>>

Reply via email to