Hi guys,
I am using rabbitmq-spout offered by https://github.com/ppat/storm-rabbitmq
to read data from rabbitmq. Here is what I see in the console when I run
the topology locally:
101703 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.executor -
Acking message 33
113099 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.task -
Emitting: storm-obser-spout default
[{"readings":[0.0,0.0,0.0,88.007,0.0,0.0,0.0,17.83,64.0,0.0,0.0,0.0,0.0,11.071,0.2,1.69,0.0,0.0,-0.23,88.007,0.0,0.0,0.0,0.0,0.0,0.0],"observationId":"f45ef06f-88dc-4d50-b28e-24937dda5240","deploymentId":"
http://www.xxxx.org/resource/deployment#aiken_8","deviceId":"0","observationDateTime":"11-05-2014
19:05:15 UTC"},
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4]
113099 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.task -
Emitting: storm-obser-spout __ack_init [-3424274568001459772 0 2]
Can anybody tell me where does these two lines come from:
"io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790,
io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4" ?
How can I parse each field in my JSON format messages like "readings" and
"observationDateTime"?
I attached my topology source code.
Thanks.
package io.latent.storm.rabbitmq;
import io.latent.storm.rabbitmq.config.ConnectionConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.Scheme;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
public class TestRabbitStormTopology {
public static void main(String[] argv){
Scheme irObserPayloadScheme = new IrObserPayloadScheme();
Declarator stormObserQueue = new StormQueueDeclarator("ir.observations", "observations_storm", "gatewayserver_v2");
//define scheme
RabbitMQMessageScheme rabbitscheme= new RabbitMQMessageScheme(irObserPayloadScheme, "irObser", "payload");
IRichSpout rabbitspout = new RabbitMQSpout(rabbitscheme, stormObserQueue);
//configuration of rabbitmq: host, port, username, password, virtualhost, heartbeat
ConnectionConfig connectionConfig = new ConnectionConfig("lbha1.ir.clemson.edu", 5672, "iradmin","d5rg!r13","/",10);
ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
.queue("observations_storm")
.prefetch(256)
.requeueOnFail()
.build();
//build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("storm-obser-spout", rabbitspout)
.addConfigurations(spoutConfig.asMap())
.setMaxSpoutPending(256);
//set up topology configuration
Config configTopo = new Config();
configTopo.setDebug(true);
configTopo.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("storm-rabbitmq-obser", configTopo, builder.createTopology());
}
}