Hi Noppanit, Just to make sure we're talking about the same thing, you want to send from OriginBolt tuples where _c="a" to e.g. BoltOne and tuples where _c="b" to BoltTwo, where BoltOne and BoltTwo are two different bolt implementations.
You need to make your bolt define multiple output streams, e.g. "stream1", "stream2". You then need to make it emit _c="a" to "stream1" and _c="b" to "stream2". When you build your topology, you then make BoltOne listen to "stream1" and BoltTwo listen to "stream2". There's some good example code here https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm 2017-10-08 18:48 GMT+02:00 Noppanit Charassinvichai <[email protected]>: > Hi, > > I have a Storm cluster connecting to Kinesis Stream. The message looks > like this. > > { > _c: "a" > } > > or it should be > > { > _c: "b" > } > > I would like to send a tuple with _c="a" to one bolt and _c="b" to a > different bolt. How do I achieve this? > > This is the bolt that parsing the message from Kinesis to JSON Object > using GSon > > @Override > public void execute(Tuple tuple) { > String partitionKey = (String) tuple.getValueByField( > SampleKinesisRecordScheme.FIELD_PARTITION_KEY); > String sequenceNumber = (String) tuple.getValueByField( > SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); > byte[] payload = (byte[]) tuple.getValueByField( > SampleKinesisRecordScheme.FIELD_RECORD_DATA); > > ByteBuffer buffer = ByteBuffer.wrap(payload); > String data = null; > try { > data = decoder.decode(buffer).toString(); > > HashMap < String, String > map = new Gson().fromJson(data, new > TypeToken < HashMap < String, Object >> () {}.getType()); > > this.outputCollector.emit(tuple, new Values(map)); > this.outputCollector.ack(tuple); > > } catch (CharacterCodingException e) { > this.outputCollector.fail(tuple); > } > > } > > Thanks >
