I found this :https://issues.apache.org/jira/browse/STORM-1492
"With the default value for nimbus.seeds (["localhost"]) Storm UI may list one "Offline" nimbus for localhost, and another as "Leader" for the resolved machine name. A workaround is to modify storm.yaml and replace "localhost" with the hostname of the machine in nimbus.seeds." However, when I drop in my hostname, I am no longer able to spin up workers! storm supervisor does nothing now. On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <[email protected]> wrote: > Yes. Thank you for replying! I've been fussing over it some more and I > think I'm getting closer to the issue. > > In fact, the logs do give a clue- my workers start in state "EMPTY > -assignment null," do nothing, then get removed after not being used. > The work isn't even hitting the workers. > > in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as > the leader, and localhost as offline. > > So, somehow, I must have my nimbus and workers running somewhere > completely different from the Kafka cluster, which are running on localhost. > > I am currently futzing with port numbers in storm.yaml. > > How can I bring localhost online as the leader? > > On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <[email protected]> > wrote: > >> Hi Ryan, >> >> I don't see anything obviously wrong with your configuration. It's likely >> your topology logs can tell you what's going wrong. Next time you start >> your topology make note of the topology name in Storm UI. Also click in to >> your spout in Storm UI and note which worker port(s) it's running on (if >> you're running on a multi-node cluster you'll also need to note which >> machine is running the spout). You should then be able to go to >> $storm-install-dir/logs/workers-artifacts/$your-topology- >> name-here/$worker-port/worker.log on the relevant worker and see what >> the spout worker is logging. >> >> In case you don't find anything interesting there, you might also look at >> logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on >> the machine running the supervisor for those logs. >> >> Also just to make sure, you're running "storm supervisor" as well as >> "storm nimbus", right? Otherwise your topology won't be assigned to a >> worker. >> >> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <[email protected]>: >> >>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got >>> a simple topology working in Local mode- It reads the messages from a Kafka >>> topic and sends them to a bolt that logs them. However, when I try to >>> submit the Topology to a cluster, the Storm UI always reads 0 tuples >>> emitted from the KafkaSpout. >>> >>> I've done several laps around the internet at this point, built and >>> tried different starter projects, and each has the same issue. I can submit >>> the Topology, but it won't actually work. >>> >>> Similar problems to mine seem to come from the Storm /lib and >>> incompatible .jar files within. I haven't found anything like that in my >>> case. However, I'm not 100% sure what I should be looking for so I can't >>> rule it out. >>> >>> I don't know how to make code look pretty on a mailing list, so here is >>> a stack overflow about my issue: >>> >>> https://stackoverflow.com/questions/46676377/apache-storm-ka >>> fka-cant-see-sent-kafka-messages-in-storm-ui >>> >>> I make sure to call storm.supervisor before testing. >>> >>> I have zookeeper running off port 2181. >>> >>> I spin up a Kafka broker and use the topic storm-test-topic1. >>> >>> I fire up a console Kafka producer to send nonsense messages. >>> >>> Storm.yaml: >>> ########### These MUST be filled in for a storm configuration >>> storm.zookeeper.servers: >>> - "localhost" >>> # - "server2" >>> # >>> nimbus.seeds: ["localhost"] >>> # >>> # >>> >>> ------------------------------------------------------------ >>> ---------------------------------- >>> Topology: >>> >>> package com.kafka.storm; >>> >>> import java.util.HashMap; >>> >>> import org.apache.log4j.Logger; >>> import org.apache.storm.Config; >>> import org.apache.storm.LocalCluster; >>> import org.apache.storm.StormSubmitter; >>> import org.apache.storm.generated.AlreadyAliveException; >>> import org.apache.storm.generated.AuthorizationException; >>> import org.apache.storm.generated.InvalidTopologyException; >>> import org.apache.storm.kafka.BrokerHosts; >>> import org.apache.storm.kafka.KafkaSpout; >>> import org.apache.storm.kafka.SpoutConfig; >>> import org.apache.storm.kafka.StringScheme; >>> import org.apache.storm.kafka.ZkHosts; >>> import org.apache.storm.spout.SchemeAsMultiScheme; >>> import org.apache.storm.topology.TopologyBuilder; >>> >>> import com.kafka.storm.bolt.LoggerBolt; >>> >>> public class KafkaStormIntegrationDemo { >>> private static final Logger LOG = Logger.getLogger(KafkaStormInt >>> egrationDemo.class); >>> >>> public static void main(String[] args) throws InvalidTopologyException, >>> AuthorizationException, AlreadyAliveException { >>> >>> // Build Spout configuration using input command line parameters >>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181"); >>> final String kafkaTopic = "storm-test-topic1"; >>> final String zkRoot = ""; >>> final String clientId = "storm-consumer"; >>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, >>> clientId); >>> kafkaConf.startOffsetTime = -2; >>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> >>> // Build topology to consume message from kafka and print them on console >>> final TopologyBuilder topologyBuilder = new TopologyBuilder(); >>> // Create KafkaSpout instance using Kafka configuration and add it to >>> topology >>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1); >>> //Route the output of Kafka Spout to Logger bolt to log messages >>> consumed from Kafka >>> topologyBuilder.setBolt("print-messages", new >>> LoggerBolt()).globalGrouping("kafka-spout"); >>> // Submit topology to local cluster i.e. embedded storm instance in >>> eclipse >>> Config conf = new Config(); >>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s >>> torm-core-1.1.1.jar"); >>> StormSubmitter.submitTopology("kafkaTopology", conf, >>> topologyBuilder.createTopology()); >>> } >>> } >>> ------------------------------------------------------------ >>> ---------------------------------- >>> >>> Bolt: >>> >>> package com.kafka.storm.bolt; >>> >>> import org.apache.log4j.Logger; >>> import org.apache.storm.topology.BasicOutputCollector; >>> import org.apache.storm.topology.OutputFieldsDeclarer; >>> import org.apache.storm.topology.base.BaseBasicBolt; >>> import org.apache.storm.tuple.Fields; >>> import org.apache.storm.tuple.Tuple; >>> >>> public class LoggerBolt extends BaseBasicBolt{ >>> private static final long serialVersionUID = 1L; >>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class); >>> >>> public void execute(Tuple input, BasicOutputCollector collector) { >>> LOG.info(input.getString(0)); >>> } >>> >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("message")); >>> } >>> } >>> >>> >>> thank you in advance for any help you can give, or for just reading! >>> >>> >> >
