Hi,
I have a Storm cluster connecting to Kinesis Stream. The message looks like
this.
{
_c: "a"
}
or it should be
{
_c: "b"
}
I would like to send a tuple with _c="a" to one bolt and _c="b" to a
different bolt. How do I achieve this?
This is the bolt that parsing the message from Kinesis to JSON Object using
GSon
@Override
public void execute(Tuple tuple) {
String partitionKey = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
String sequenceNumber = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
byte[] payload = (byte[])
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
ByteBuffer buffer = ByteBuffer.wrap(payload);
String data = null;
try {
data = decoder.decode(buffer).toString();
HashMap < String, String > map = new Gson().fromJson(data, new
TypeToken < HashMap < String, Object >> () {}.getType());
this.outputCollector.emit(tuple, new Values(map));
this.outputCollector.ack(tuple);
} catch (CharacterCodingException e) {
this.outputCollector.fail(tuple);
}
}
Thanks