Hi all,
I have a really strange behaviour on my topology.
a) I have a Spout that is reading messages from RMQ and sends those to a
DataPointConverterBolt to convert it in JSON etc.
This is its code:
public class DataPointConverterBolt extends BaseBasicBolt {
...
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
synchronized (collector) {
if (tuple.contains("object")) {
LOGGER.error("TUPLE:");
LOGGER.error(tuple.getValues().toString());
if (tuple.getValueByField("object") instanceof JSONObject) {
JSONObject message = (JSONObject) tuple
.getValueByField("object");
... more JSON operations
b) Now, if I deploy the topology with 1 worker everything works well and a
typical debug is something like this:
... other threads/executors output ...
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR]
[*{"value":16,"sourceTimestamp":1413905723923}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@6bad662f,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@450fcee3]
... other threads/executors output ...
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR]
[*{"value":21,"sourceTimestamp":14139057238641000}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@325f046d,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@7c1ba9f0]
... etc
c) Then, if I rebalance the topology to 2 or 3 workers (JVMs) then the bolt
with this task starts receiving only empty tuples from the spout:
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@4b6abe5a,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@4cf72d64]
c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@77c527b5,
io.latent.storm.rabbitmq.
....
Any thoughts on what can produce this empty tuples?
Thank you for your support
Best regards
Hugo