""message":"Reading reply sessionid:0x34712e79b9204f3, packet::
clientPath:null serverPath:null finished:false header:: 225,4
  replyHeader:: 225,34401287364,0  request:: '/storm/
assignments/OfflineRuleTopology-83-1406827538,T  response::
#ffffffac ......"

This looks like a ZooKeeper log message.

These will happen even when no tuples are flowing yet.
--
Derek

On 7/31/14, 13:27, Eugene wrote:
My trident topology is using kafka-spout and processing messages from
kafka.
When I deploy storm topology to a cluster,UI immediately start showing that
it's processing  a big number of tuples. I don't even started sending
messages to kafka!
In log file I see a lot of messages like this one:

""message":"Reading reply sessionid:0x34712e79b9204f3, packet::
clientPath:null serverPath:null finished:false header:: 225,4
  replyHeader:: 225,34401287364,0  request:: '/storm/
assignments/OfflineRuleTopology-83-1406827538,T  response::
#ffffffac ......"

What does this message mean?

I tried to delete topic from kafka but it did not help.
My topology is as follow:


BrokerHosts brokerHosts = new ZkHosts("zooas31d-con-08");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
         brokerHosts,
         "offline_events",
         "offline_events_client");

spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
StateFactory mongodb = new MongoDBStateFactory("cp","tlc")
// should it be transactional?
TransactionalTridentKafkaSpout spout = new
TransactionalTridentKafkaSpout(spoutConfig);

Stream inputStream=topology.newStream("offlineEvents", spout)
inputStream.each(new Fields("event"),new getProfile(),new Fields("profile"))
.shuffle()
.each(new Fields("event"),new getTargetList(),new Fields("targetList"))
.each(new Fields("event"),new partitionRules(),new Fields("ruleFile"))
.shuffle()
.each(new Fields("event","profile","targetList","ruleFile"), new
executeRule(), new Fields("tacticId","tactic"))
.project(new Fields("tacticId","tactic","event"))
.shuffle()
.each(new Fields("tactic"),new filterAssignedTactic())
.partitionPersist(mongodb, new Fields("tactic","event"), new
MongoDBStateUpdater())

Config config = new Config()

config.setDebug(false)
config.setNumAckers(1)
config.setNumWorkers(1)

Map statsdConfig = new HashMap();
statsdConfig.put(StatsdMetricConsumer.STATSD_HOST, "
statsd.compute-1.amazonaws.com");
statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, 8125);
statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, "storm.metrics.");

Config conf = new Config();
conf.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2);



config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);

     config.setMaxSpoutPending(5)
     config.setMessageTimeoutSecs(60);
     config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 20);
     config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 500);

     StormSubmitter.submitTopology("OfflineRuleTopology", config,
topology.build())



Is it something to do with Kafka/Zookeeper? What I am missing here?
If it is endlessly reprocessing batches, but then why I don't see any error
messages and new batches still get processed?



Thanks

Screenshot:
[image: Inline image 1]


Reply via email to