Hi Noppanit,

Just to make sure we're talking about the same thing, you want to send from
OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b" to
BoltTwo, where BoltOne and BoltTwo are two different bolt implementations.

You need to make your bolt define multiple output streams, e.g. "stream1",
"stream2". You then need to make it emit _c="a" to "stream1" and _c="b" to
"stream2". When you build your topology, you then make BoltOne listen to
"stream1" and BoltTwo listen to "stream2".

There's some good example code here
https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm

2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <[email protected]>:

> Hi,
>
> I have a Storm cluster connecting to Kinesis Stream. The message looks
> like this.
>
> {
> _c: "a"
> }
>
> or it should be
>
> {
> _c: "b"
> }
>
> I would like to send a tuple with _c="a" to one bolt and _c="b" to a
> different bolt. How do I achieve this?
>
> This is the bolt that parsing the message from Kinesis to JSON Object
> using GSon
>
>     @Override
>     public void execute(Tuple tuple) {
>       String partitionKey = (String) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
>       String sequenceNumber = (String) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
>       byte[] payload = (byte[]) tuple.getValueByField(
> SampleKinesisRecordScheme.FIELD_RECORD_DATA);
>
>       ByteBuffer buffer = ByteBuffer.wrap(payload);
>       String data = null;
>       try {
>         data = decoder.decode(buffer).toString();
>
>         HashMap < String, String > map = new Gson().fromJson(data, new
> TypeToken < HashMap < String, Object >> () {}.getType());
>
>         this.outputCollector.emit(tuple, new Values(map));
>         this.outputCollector.ack(tuple);
>
>       } catch (CharacterCodingException e) {
>         this.outputCollector.fail(tuple);
>       }
>
>     }
>
> Thanks
>

Reply via email to