Hi guys,

I am using rabbitmq-spout offered by https://github.com/ppat/storm-rabbitmq
to read data from rabbitmq. Here is what I see in the console when I run
the topology locally:

101703 [Thread-16-storm-obser-spout] INFO  backtype.storm.daemon.executor -
Acking message 33
113099 [Thread-16-storm-obser-spout] INFO  backtype.storm.daemon.task -
Emitting: storm-obser-spout default
[{"readings":[0.0,0.0,0.0,88.007,0.0,0.0,0.0,17.83,64.0,0.0,0.0,0.0,0.0,11.071,0.2,1.69,0.0,0.0,-0.23,88.007,0.0,0.0,0.0,0.0,0.0,0.0],"observationId":"f45ef06f-88dc-4d50-b28e-24937dda5240","deploymentId":"
http://www.xxxx.org/resource/deployment#aiken_8","deviceId":"0","observationDateTime":"11-05-2014
19:05:15 UTC"},
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4]
113099 [Thread-16-storm-obser-spout] INFO  backtype.storm.daemon.task -
Emitting: storm-obser-spout __ack_init [-3424274568001459772 0 2]

Can anybody tell me where does these two lines come from:
"io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4" ?

How can I parse each field in my JSON format messages like "readings" and
"observationDateTime"?

I attached my topology source code.

Thanks.
package io.latent.storm.rabbitmq;

import io.latent.storm.rabbitmq.config.ConnectionConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.Scheme;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;

public class TestRabbitStormTopology {
	public static void main(String[] argv){
		
		Scheme irObserPayloadScheme = new IrObserPayloadScheme();
	    Declarator stormObserQueue = new StormQueueDeclarator("ir.observations", "observations_storm", "gatewayserver_v2");
		//define scheme
		RabbitMQMessageScheme rabbitscheme= new RabbitMQMessageScheme(irObserPayloadScheme, "irObser", "payload");
		IRichSpout rabbitspout = new RabbitMQSpout(rabbitscheme, stormObserQueue);
		
		//configuration of rabbitmq: host, port, username, password, virtualhost, heartbeat
		ConnectionConfig connectionConfig = new ConnectionConfig("lbha1.ir.clemson.edu", 5672, "iradmin","d5rg!r13","/",10);
		ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
                .queue("observations_storm")
                .prefetch(256)
                .requeueOnFail()
                .build();

		//build topology
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("storm-obser-spout", rabbitspout)
		       .addConfigurations(spoutConfig.asMap())
		       .setMaxSpoutPending(256);
		
		//set up topology configuration
		Config configTopo = new Config();
		configTopo.setDebug(true);
		configTopo.setMaxTaskParallelism(3);

	    LocalCluster cluster = new LocalCluster();
	    cluster.submitTopology("storm-rabbitmq-obser", configTopo, builder.createTopology());
	}
}

Reply via email to