you are right, fixing the display bug did not affect the cluster at all.
Thank you again for taking the time. I'm just about ready to give up, I've
been working on this all week.

I have Kafka, Zookeeper, Nimbus, supervisor, and UI all running in
localhost, I am not connecting to any other machines.

I called Supervisor again- the UI registers that there are executors for a
minute and then they go away having done nothing. Here are the logs of
Nimbus and Supervisor around the time I called storm supervisor.

In the logs I swapped my actual PC's name with [MY PC NAME] before posting.

Supervisor log:

2017-10-11 17:26:20.552 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
2017-10-11 17:26:20.563 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main [INFO]
Starting
2017-10-11 17:26:20.579 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:host.name=[MY PC NAME]
2017-10-11 17:26:20.580 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.version=1.8.0_131
2017-10-11 17:26:20.581 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.vendor=Oracle Corporation
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.home=C:\JAVA\jdk1.8.0_131\jre
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.class.path=C:\apache-storm-1.1.1\*;C:\apache-storm-1.1.1\conf;C:\JAVA\jdk1.8.0_131\lib\tools.jar;C:\apache-storm-1.1.1\lib\asm-5.0.3.jar;C:\apache-storm-1.1.1\lib\clojure-1.7.0.jar;C:\apache-storm-1.1.1\lib\disruptor-3.3.2.jar;C:\apache-storm-1.1.1\lib\kryo-3.0.3.jar;C:\apache-storm-1.1.1\lib\log4j-api-2.8.2.jar;C:\apache-storm-1.1.1\lib\log4j-core-2.8.2.jar;C:\apache-storm-1.1.1\lib\log4j-over-slf4j-1.6.6.jar;C:\apache-storm-1.1.1\lib\log4j-slf4j-impl-2.8.2.jar;C:\apache-storm-1.1.1\lib\minlog-1.3.0.jar;C:\apache-storm-1.1.1\lib\objenesis-2.1.jar;C:\apache-storm-1.1.1\lib\reflectasm-1.10.1.jar;C:\apache-storm-1.1.1\lib\ring-cors-0.1.5.jar;C:\apache-storm-1.1.1\lib\servlet-api-2.5.jar;C:\apache-storm-1.1.1\lib\slf4j-api-1.7.21.jar;C:\apache-storm-1.1.1\lib\storm-core-1.1.1.jar;C:\apache-storm-1.1.1\lib\storm-rename-hack-1.1.1.jar
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib;C:\JAVA\jdk1.8.0_131\bin;C:\JAVA\jdk1.8.0_131\lib;C:\JAVA\jdk1.8.0_131\jre\bin;C:\JAVA\jdk1.8.0_131\jre\lib
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.io.tmpdir=C:\Users\rbliton\AppData\Local\Temp\
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:java.compiler=<NA>
2017-10-11 17:26:20.582 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.name=Windows 10
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.arch=amd64
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:os.version=10.0
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.name=rbliton
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.home=C:\Users\rbliton
2017-10-11 17:26:20.583 o.a.s.s.o.a.z.ZooKeeper main [INFO] Client
environment:user.dir=C:\apache-storm-1.1.1\bin
2017-10-11 17:26:20.585 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
client connection, connectString=localhost:2181 sessionTimeout=20000
watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@2216effc
2017-10-11 17:26:20.801 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Opening socket connection to server
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-10-11 17:26:20.803 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Socket connection established to
127.0.0.1/127.0.0.1:2181, initiating session
2017-10-11 17:26:20.826 o.a.s.s.o.a.z.ClientCnxn main-SendThread(
127.0.0.1:2181) [INFO] Session establishment complete on server
127.0.0.1/127.0.0.1:2181, sessionid = 0x15f0d3b46a70016, negotiated timeout
= 20000
2017-10-11 17:26:20.845 o.a.s.s.o.a.c.f.s.ConnectionStateManager
main-EventThread [INFO] State change: CONNECTED
2017-10-11 17:26:20.856 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl
Curator-Framework-0 [INFO] backgroundOperationsLoop exiting
2017-10-11 17:26:20.867 o.a.s.s.o.a.z.ZooKeeper main [INFO] Session:
0x15f0d3b46a70016 closed
2017-10-11 17:26:20.869 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
2017-10-11 17:26:20.870 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main [INFO]
Starting
2017-10-11 17:26:20.869 o.a.s.s.o.a.z.ClientCnxn main-EventThread [INFO]
EventThread shut down
2017-10-11 17:26:20.870 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating
client connection, connectString=localhost:2181/storm sessionTimeout=20000
watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@4417af13
2017-10-11 17:26:20.880 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Opening socket connection to
server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to
authenticate using SASL (unknown error)
2017-10-11 17:26:20.881 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Socket connection established
to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, initiating session
2017-10-11 17:26:20.932 o.a.s.s.o.a.z.ClientCnxn
main-SendThread(0:0:0:0:0:0:0:1:2181) [INFO] Session establishment complete
on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, sessionid =
0x15f0d3b46a70017, negotiated timeout = 20000
2017-10-11 17:26:20.933 o.a.s.s.o.a.c.f.s.ConnectionStateManager
main-EventThread [INFO] State change: CONNECTED
2017-10-11 17:26:20.952 o.a.s.l.Localizer main [INFO] Reconstruct localized
resource: C:\apache-storm-1.1.1\storm-local\supervisor\usercache
2017-10-11 17:26:20.954 o.a.s.l.Localizer main [WARN] No left over
resources found for any user during reconstructing of local resources at:
C:\apache-storm-1.1.1\storm-local\supervisor\usercache
2017-10-11 17:26:20.962 o.a.s.d.s.Supervisor main [INFO] Starting
supervisor for storm version '1.1.1'.
2017-10-11 17:26:21.005 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6700
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.006 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6701
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.009 o.a.s.d.s.Slot main [WARN] SLOT [MY PC NAME]:6703
Starting in state EMPTY - assignment null
2017-10-11 17:26:21.020 o.a.s.d.s.Container main [INFO] GET worker-user for
964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.022 o.a.s.d.s.Container main [INFO] Cleaning up
5ef774ff-f24c-4810-9330-5e1291bd7539:964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.023 o.a.s.d.s.Container main [INFO] GET worker-user for
964b34c1-d7e3-4f22-9ee2-a0664c10294e
2017-10-11 17:26:21.024 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\heartbeats
2017-10-11 17:26:21.029 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\pids
2017-10-11 17:26:21.032 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e\tmp
2017-10-11 17:26:21.034 o.a.s.d.s.AdvancedFSOps main [INFO] Deleting path
C:\apache-storm-1.1.1\storm-local\workers\964b34c1-d7e3-4f22-9ee2-a0664c10294e

Nimbus logs:

2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[2 2] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[3 3] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[1 1] not alive
2017-10-11 17:26:22.189 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[4 4] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[2 2] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[3 3] not alive
2017-10-11 17:26:22.190 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[1 1] not alive
2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available slots:
(["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6702]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6703])
2017-10-11 17:26:22.192 o.a.s.s.EvenScheduler timer [INFO] Available slots:
(["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6702]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6703])
2017-10-11 17:26:22.195 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id storm-kafka-topology1-2-1507756303:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {"5ef774ff-f24c-4810-9330-5e1291bd7539" "
LNAR-PC0611K6.corp.capgemini.com"}, :executor->node+port {[4 4]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [3 3]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [2 2]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701], [1 1]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701]}, :executor->start-time-secs
{[1 1] 1507757182, [2 2] 1507757182, [3 3] 1507757182, [4 4] 1507757182},
:worker->resources {["5ef774ff-f24c-4810-9330-5e1291bd7539" 6701] [0.0 0.0
0.0]}, :owner "rbliton"}
2017-10-11 17:26:22.201 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id kafkaTopology-1-1507755826:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {"5ef774ff-f24c-4810-9330-5e1291bd7539" "
LNAR-PC0611K6.corp.capgemini.com"}, :executor->node+port {[2 2]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700], [1 1]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700], [3 3]
["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700]}, :executor->start-time-secs
{[1 1] 1507757182, [2 2] 1507757182, [3 3] 1507757182}, :worker->resources
{["5ef774ff-f24c-4810-9330-5e1291bd7539" 6700] [0.0 0.0 0.0]}, :owner
"rbliton"}
2017-10-11 17:26:23.209 o.a.s.d.nimbus pool-14-thread-25 [INFO] Created
download session for kafkaTopology-1-1507755826-stormjar.jar with id
bed36ea8-1d05-4d80-8006-c2bfb40ed8f6
2017-10-11 17:26:23.801 o.a.s.d.nimbus pool-14-thread-7 [INFO] Created
download session for kafkaTopology-1-1507755826-stormcode.ser with id
7227688a-4381-48d5-ac27-c6a6e03f5a5d
2017-10-11 17:26:23.837 o.a.s.d.nimbus pool-14-thread-11 [INFO] Created
download session for kafkaTopology-1-1507755826-stormconf.ser with id
d4d6712b-55ec-46fe-8e9c-6df068d65133
2017-10-11 17:26:23.913 o.a.s.d.nimbus pool-14-thread-20 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormjar.jar with
id 3b7263a0-fd39-4caf-b90b-3deaf3999308
2017-10-11 17:26:24.496 o.a.s.d.nimbus pool-14-thread-64 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormcode.ser with
id cd88098a-22c7-44c3-b709-34a202bc4952
2017-10-11 17:26:24.529 o.a.s.d.nimbus pool-14-thread-63 [INFO] Created
download session for storm-kafka-topology1-2-1507756303-stormconf.ser with
id e7478249-7082-4edb-93bb-3746f39db06e
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[2 2] not alive
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[3 3] not alive
2017-10-11 17:28:29.931 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[1 1] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
storm-kafka-topology1-2-1507756303:[4 4] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[2 2] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[3 3] not alive
2017-10-11 17:28:29.932 o.a.s.d.nimbus timer [INFO] Executor
kafkaTopology-1-1507755826:[1 1] not alive
2017-10-11 17:28:29.936 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id storm-kafka-topology1-2-1507756303:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {}, :executor->node+port {}, :executor->start-time-secs {[1 1]
1507757182, [2 2] 1507757182, [3 3] 1507757182, [4 4] 1507757182},
:worker->resources {}, :owner "rbliton"}
2017-10-11 17:28:29.943 o.a.s.d.nimbus timer [INFO] Setting new assignment
for topology id kafkaTopology-1-1507755826:
#org.apache.storm.daemon.common.Assignment{:master-code-dir "storm-local",
:node->host {}, :executor->node+port {}, :executor->start-time-secs {[1 1]
1507757182, [2 2] 1507757182, [3 3] 1507757182}, :worker->resources {},
:owner "rbliton"}


On Wed, Oct 11, 2017 at 4:56 PM, Stig Rohde Døssing <[email protected]> wrote:

> As far as I'm aware 1492 is a display bug only. It shouldn't affect how
> your cluster works.
>
> If I'm understanding correctly you have Kafka, Zookeeper, Nimbus, one
> supervisor and UI running on localhost, and there are no other machines
> involved, right?
>
> Can you post your nimbus and supervisor logs?
>
> 2017-10-11 22:10 GMT+02:00 Ryan Bliton <[email protected]>:
>
>> just one quick update: fixed storm supervisor- that was something I did
>> while messing with ports.
>>
>> the workers are still not being put to work however.
>>
>> On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <[email protected]>
>> wrote:
>>
>>> I found this :https://issues.apache.org/jira/browse/STORM-1492
>>>
>>> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
>>> list one "Offline" nimbus for localhost, and another as "Leader" for the
>>> resolved machine name.
>>>  A workaround is to modify storm.yaml and replace "localhost" with the
>>> hostname of the machine in nimbus.seeds."
>>>
>>> However, when I drop in my hostname, I am no longer able to spin up
>>> workers! storm supervisor does nothing now.
>>>
>>>
>>>
>>> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <[email protected]>
>>> wrote:
>>>
>>>> Yes. Thank you for replying! I've been fussing over it some more and I
>>>> think I'm getting closer to the issue.
>>>>
>>>> In fact, the logs do give a clue- my workers start in state "EMPTY
>>>> -assignment null," do nothing, then get removed after not being used.
>>>> The work isn't even hitting the workers.
>>>>
>>>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
>>>> the leader, and localhost as offline.
>>>>
>>>> So, somehow, I must have my nimbus and workers running somewhere
>>>> completely different from the Kafka cluster, which are running on 
>>>> localhost.
>>>>
>>>> I am currently futzing with port numbers in storm.yaml.
>>>>
>>>> How can I bring localhost online as the leader?
>>>>
>>>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> I don't see anything obviously wrong with your configuration. It's
>>>>> likely your topology logs can tell you what's going wrong. Next time you
>>>>> start your topology make note of the topology name in Storm UI. Also click
>>>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>>>> (if you're running on a multi-node cluster you'll also need to note which
>>>>> machine is running the spout). You should then be able to go to
>>>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>>>> on the relevant worker and see what the spout worker is logging.
>>>>>
>>>>> In case you don't find anything interesting there, you might also look
>>>>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log 
>>>>> on
>>>>> the machine running the supervisor for those logs.
>>>>>
>>>>> Also just to make sure, you're running "storm supervisor" as well as
>>>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>>>> worker.
>>>>>
>>>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <[email protected]>:
>>>>>
>>>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've
>>>>>> got a simple topology working in Local mode- It reads the messages from a
>>>>>> Kafka topic and sends them to a bolt that logs them. However, when I try 
>>>>>> to
>>>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>>>> emitted from the KafkaSpout.
>>>>>>
>>>>>> I've done several laps around the internet at this point, built and
>>>>>> tried different starter projects, and each has the same issue. I can 
>>>>>> submit
>>>>>> the Topology, but it won't actually work.
>>>>>>
>>>>>> Similar problems to mine seem to come from the Storm /lib and
>>>>>> incompatible .jar files within. I haven't found anything like that in my
>>>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>>>> rule it out.
>>>>>>
>>>>>> I don't know how to make code look pretty on a mailing list, so here
>>>>>> is a stack overflow about my issue:
>>>>>>
>>>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>>>
>>>>>> I make sure to call storm.supervisor before testing.
>>>>>>
>>>>>> I have zookeeper running off port 2181.
>>>>>>
>>>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>>>
>>>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>>>
>>>>>> Storm.yaml:
>>>>>> ########### These MUST be filled in for a storm configuration
>>>>>>  storm.zookeeper.servers:
>>>>>>      - "localhost"
>>>>>> #     - "server2"
>>>>>> #
>>>>>>  nimbus.seeds: ["localhost"]
>>>>>> #
>>>>>> #
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------------
>>>>>> Topology:
>>>>>>
>>>>>> package com.kafka.storm;
>>>>>>
>>>>>> import java.util.HashMap;
>>>>>>
>>>>>> import org.apache.log4j.Logger;
>>>>>> import org.apache.storm.Config;
>>>>>> import org.apache.storm.LocalCluster;
>>>>>> import org.apache.storm.StormSubmitter;
>>>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>>>> import org.apache.storm.generated.AuthorizationException;
>>>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>>>> import org.apache.storm.kafka.BrokerHosts;
>>>>>> import org.apache.storm.kafka.KafkaSpout;
>>>>>> import org.apache.storm.kafka.SpoutConfig;
>>>>>> import org.apache.storm.kafka.StringScheme;
>>>>>> import org.apache.storm.kafka.ZkHosts;
>>>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>>>
>>>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>>>
>>>>>> public class KafkaStormIntegrationDemo {
>>>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>>>> egrationDemo.class);
>>>>>>
>>>>>> public static void main(String[] args) throws
>>>>>> InvalidTopologyException, AuthorizationException, AlreadyAliveException {
>>>>>>
>>>>>> // Build Spout configuration using input command line parameters
>>>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>>>> final String kafkaTopic = "storm-test-topic1";
>>>>>> final String zkRoot = "";
>>>>>> final String clientId = "storm-consumer";
>>>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>>>>> clientId);
>>>>>> kafkaConf.startOffsetTime = -2;
>>>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>>
>>>>>> // Build topology to consume message from kafka and print them on
>>>>>> console
>>>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>>>>> topology
>>>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf),
>>>>>> 1);
>>>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>>>> consumed from Kafka
>>>>>> topologyBuilder.setBolt("print-messages", new
>>>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>>>> eclipse
>>>>>> Config conf = new Config();
>>>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>>>> torm-core-1.1.1.jar");
>>>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>>>> topologyBuilder.createTopology());
>>>>>> }
>>>>>> }
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------------
>>>>>>
>>>>>> Bolt:
>>>>>>
>>>>>> package com.kafka.storm.bolt;
>>>>>>
>>>>>> import org.apache.log4j.Logger;
>>>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>>>> import org.apache.storm.tuple.Fields;
>>>>>> import org.apache.storm.tuple.Tuple;
>>>>>>
>>>>>> public class LoggerBolt extends BaseBasicBolt{
>>>>>> private static final long serialVersionUID = 1L;
>>>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>>>>
>>>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>>>> LOG.info(input.getString(0));
>>>>>> }
>>>>>>
>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>> declarer.declare(new Fields("message"));
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> thank you in advance for any help you can give, or for just reading!
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to