Hi,

I have a Storm cluster connecting to Kinesis Stream. The message looks like
this.

{
_c: "a"
}

or it should be

{
_c: "b"
}

I would like to send a tuple with _c="a" to one bolt and _c="b" to a
different bolt. How do I achieve this?

This is the bolt that parsing the message from Kinesis to JSON Object using
GSon

    @Override
    public void execute(Tuple tuple) {
      String partitionKey = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
      String sequenceNumber = (String)
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
      byte[] payload = (byte[])
tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);

      ByteBuffer buffer = ByteBuffer.wrap(payload);
      String data = null;
      try {
        data = decoder.decode(buffer).toString();

        HashMap < String, String > map = new Gson().fromJson(data, new
TypeToken < HashMap < String, Object >> () {}.getType());

        this.outputCollector.emit(tuple, new Values(map));
        this.outputCollector.ack(tuple);

      } catch (CharacterCodingException e) {
        this.outputCollector.fail(tuple);
      }

    }

Thanks

Reply via email to