Yupi, I fixed! After days of trying to find the problem. I will leave here
the solution. The problem was actually in the Spout and related with
serialization.
The Spout was actually the one converting to JSON and the Bolt from JSON to
Java. I changed the Spout to transfer String and the Bolt to convert
String->JSON->Java Objects.
So, transferring raw Java types between components worked. Thus, I guess
the problem was somehow related with serialization of objects between
components. This was falling and so the Tuple was being sent as default,
empty (""). Strange enough I couldn't find any Exceptions in the logs.
I guess that with only one JVM the serialization was not in use or the
types were being found by the classes autonomously.
2014-10-22 10:48 GMT+02:00 Itai Frenkel <[email protected]>:
> Does the code work correctly when the same worker runs more than one
> Spout ?
> ------------------------------
> *From:* Hugo Sequeira <[email protected]>
> *Sent:* Wednesday, October 22, 2014 11:13 AM
> *To:* [email protected]
> *Subject:* Tuples turn empty after rebalacing
>
> Hi all,
>
> I have a really strange behaviour on my topology.
>
> a) I have a Spout that is reading messages from RMQ and sends those to a
> DataPointConverterBolt to convert it in JSON etc.
> This is its code:
>
> public class DataPointConverterBolt extends BaseBasicBolt {
>
> ...
>
> @Override
> public void execute(Tuple tuple, BasicOutputCollector collector) {
>
> synchronized (collector) {
>
> if (tuple.contains("object")) {
> LOGGER.error("TUPLE:");
> LOGGER.error(tuple.getValues().toString());
>
> if (tuple.getValueByField("object") instanceof JSONObject) {
>
> JSONObject message = (JSONObject) tuple
> .getValueByField("object");
>
> ... more JSON operations
>
> b) Now, if I deploy the topology with 1 worker everything works well and
> a typical debug is something like this:
>
> ... other threads/executors output ...
> c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
> c.a.e.b.DataPointConverterBolt [ERROR]
> [*{"value":16,"sourceTimestamp":1413905723923}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@6bad662f,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@450fcee3]
> ... other threads/executors output ...
> c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
> c.a.e.b.DataPointConverterBolt [ERROR]
> [*{"value":21,"sourceTimestamp":14139057238641000}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@325f046d,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@7c1ba9f0]
> ... etc
>
> c) Then, if I rebalance the topology to 2 or 3 workers (JVMs) then the
> bolt with this task starts receiving only empty tuples from the spout:
>
> c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
> c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@4b6abe5a,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@4cf72d64]
>
> c.a.e.b.DataPointConverterBolt [ERROR] TUPLE:
> c.a.e.b.DataPointConverterBolt [ERROR] [*{}*,
> io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@77c527b5,
> io.latent.storm.rabbitmq.
>
> ....
>
> Any thoughts on what can produce this empty tuples?
>
> Thank you for your support
> Best regards
> Hugo
>
>
>
>