Hello
I'm reading data from Kafka formatted as a Protobuf object (it comes out as a
byte[] )
This works fine and I can read / decode the data, but trying to push back to
the queue, when declaring the Kafka Bolt without any type specifics, it seems
to require a String object that then gets encoded.KafkaBolt kafkaBolt = new
KafkaBolt()
.withTopicSelector(new DefaultTopicSelector("topic"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
If I try to apply typing to the constructor like this:KafkaBolt kafkaBolt = new
KafkaBolt<String, byte[]>()
.withTopicSelector(new DefaultTopicSelector("topic"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String,
byte[]>());It still seems to expect a String to decode.I tried to encode as a
string in the previous bolt with:byte[] byteArray = input.getBinary(0);
String output;
try {
output = new String(byteArray, "ISO-8859-1");
_collector.emit(new Values(this.topic, output, "key"));
} catch (...){}and now the KafkaBolt is able to send to the queue, but it fails
on some objects, and the ProtoBuf decoder o nthe other side spits out lots of
errors.I used ISO-8859-1 as I read it is a 1-1 mapping from binary, but I have
no idea what encoding is used by the KafkaBolt on the other side and it
certainly can be the problem.
Ideally I want to avoid the String encoding / decoding, so how do I specify the
type of the 'message' to the KafkaBolt?
What am I doing wrong?Thanks for help.
Regards