I found this :https://issues.apache.org/jira/browse/STORM-1492

"With the default value for nimbus.seeds (["localhost"]) Storm UI may list
one "Offline" nimbus for localhost, and another as "Leader" for the
resolved machine name.
 A workaround is to modify storm.yaml and replace "localhost" with the
hostname of the machine in nimbus.seeds."

However, when I drop in my hostname, I am no longer able to spin up
workers! storm supervisor does nothing now.



On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <[email protected]> wrote:

> Yes. Thank you for replying! I've been fussing over it some more and I
> think I'm getting closer to the issue.
>
> In fact, the logs do give a clue- my workers start in state "EMPTY
> -assignment null," do nothing, then get removed after not being used.
> The work isn't even hitting the workers.
>
> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
> the leader, and localhost as offline.
>
> So, somehow, I must have my nimbus and workers running somewhere
> completely different from the Kafka cluster, which are running on localhost.
>
> I am currently futzing with port numbers in storm.yaml.
>
> How can I bring localhost online as the leader?
>
> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <[email protected]>
> wrote:
>
>> Hi Ryan,
>>
>> I don't see anything obviously wrong with your configuration. It's likely
>> your topology logs can tell you what's going wrong. Next time you start
>> your topology make note of the topology name in Storm UI. Also click in to
>> your spout in Storm UI and note which worker port(s) it's running on (if
>> you're running on a multi-node cluster you'll also need to note which
>> machine is running the spout). You should then be able to go to
>> $storm-install-dir/logs/workers-artifacts/$your-topology-
>> name-here/$worker-port/worker.log on the relevant worker and see what
>> the spout worker is logging.
>>
>> In case you don't find anything interesting there, you might also look at
>> logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>> the machine running the supervisor for those logs.
>>
>> Also just to make sure, you're running "storm supervisor" as well as
>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>> worker.
>>
>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <[email protected]>:
>>
>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got
>>> a simple topology working in Local mode- It reads the messages from a Kafka
>>> topic and sends them to a bolt that logs them. However, when I try to
>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>> emitted from the KafkaSpout.
>>>
>>> I've done several laps around the internet at this point, built and
>>> tried different starter projects, and each has the same issue. I can submit
>>> the Topology, but it won't actually work.
>>>
>>> Similar problems to mine seem to come from the Storm /lib and
>>> incompatible .jar files within. I haven't found anything like that in my
>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>> rule it out.
>>>
>>> I don't know how to make code look pretty on a mailing list, so here is
>>> a stack overflow about my issue:
>>>
>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>
>>> I make sure to call storm.supervisor before testing.
>>>
>>> I have zookeeper running off port 2181.
>>>
>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>
>>> I fire up a console Kafka producer to send nonsense messages.
>>>
>>> Storm.yaml:
>>> ########### These MUST be filled in for a storm configuration
>>>  storm.zookeeper.servers:
>>>      - "localhost"
>>> #     - "server2"
>>> #
>>>  nimbus.seeds: ["localhost"]
>>> #
>>> #
>>>
>>> ------------------------------------------------------------
>>> ----------------------------------
>>> Topology:
>>>
>>> package com.kafka.storm;
>>>
>>> import java.util.HashMap;
>>>
>>> import org.apache.log4j.Logger;
>>> import org.apache.storm.Config;
>>> import org.apache.storm.LocalCluster;
>>> import org.apache.storm.StormSubmitter;
>>> import org.apache.storm.generated.AlreadyAliveException;
>>> import org.apache.storm.generated.AuthorizationException;
>>> import org.apache.storm.generated.InvalidTopologyException;
>>> import org.apache.storm.kafka.BrokerHosts;
>>> import org.apache.storm.kafka.KafkaSpout;
>>> import org.apache.storm.kafka.SpoutConfig;
>>> import org.apache.storm.kafka.StringScheme;
>>> import org.apache.storm.kafka.ZkHosts;
>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>> import org.apache.storm.topology.TopologyBuilder;
>>>
>>> import com.kafka.storm.bolt.LoggerBolt;
>>>
>>> public class KafkaStormIntegrationDemo {
>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>> egrationDemo.class);
>>>
>>> public static void main(String[] args) throws InvalidTopologyException,
>>> AuthorizationException, AlreadyAliveException {
>>>
>>> // Build Spout configuration using input command line parameters
>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>> final String kafkaTopic = "storm-test-topic1";
>>> final String zkRoot = "";
>>> final String clientId = "storm-consumer";
>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>> clientId);
>>> kafkaConf.startOffsetTime = -2;
>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>
>>> // Build topology to consume message from kafka and print them on console
>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>> topology
>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>> consumed from Kafka
>>> topologyBuilder.setBolt("print-messages", new
>>> LoggerBolt()).globalGrouping("kafka-spout");
>>> // Submit topology to local cluster i.e. embedded storm instance in
>>> eclipse
>>> Config conf = new Config();
>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>> torm-core-1.1.1.jar");
>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>> topologyBuilder.createTopology());
>>> }
>>> }
>>> ------------------------------------------------------------
>>> ----------------------------------
>>>
>>> Bolt:
>>>
>>> package com.kafka.storm.bolt;
>>>
>>> import org.apache.log4j.Logger;
>>> import org.apache.storm.topology.BasicOutputCollector;
>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>> import org.apache.storm.tuple.Fields;
>>> import org.apache.storm.tuple.Tuple;
>>>
>>> public class LoggerBolt extends BaseBasicBolt{
>>> private static final long serialVersionUID = 1L;
>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>
>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>> LOG.info(input.getString(0));
>>> }
>>>
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("message"));
>>> }
>>> }
>>>
>>>
>>> thank you in advance for any help you can give, or for just reading!
>>>
>>>
>>
>

Reply via email to