Im sorry, i made a mistake ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); // /brokers -> kafka broker SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper
If you see not is a ERROR, is a INFO usually when directory structure not exist. Take a look Zookeeper Kaka-broker (port 2181) ---------------------------------- -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics/bid_history/partitions/0/state [] get /brokers/topics/bid_history/partitions/0/state {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} cZxid = 0x113 ctime = Tue May 19 18:19:24 BRT 2015 mZxid = 0x113 mtime = Tue May 19 18:19:24 BRT 2015 pZxid = 0x113 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0 Take a look Zookeeper Storm-broker (port 2000) ---------------------------------- Start topology, send a message and then look: bin/zookeeper-shell.sh localhost:2000 Connecting to localhost:2000 Welcome to ZooKeeper! JLine support is disabled ls / [storm, brokers, zookeeper] ls /brokers [kafka-spout] ls /brokers/kafka-spout [partition_0] ls /brokers/kafka-spout/partition_0 [] get /brokers/kafka-spout/partition_0 {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"} cZxid = 0x50 ctime = Thu May 21 11:00:48 BRT 2015 mZxid = 0x50 mtime = Thu May 21 11:00:48 BRT 2015 pZxid = 0x50 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 182 numChildren = 0 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>: > Bec I do see this error in the storm logs > > 5304 [ProcessThread(sid:0 cport:-1):] INFO > org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level > KeeperException when processing sessionid:0x14d76b1ea54000c type:create > cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error > Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for > /brokers/rawWarehousePriceSpout > > > > On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com> > wrote: > > Are you wrong > > zookeeperPath , // the root path in Zookeeper for the spout to store the > consumer offsets > > Is for you KafkaSpout find information about broker meta information ( > topics, partitions) > > You have to override > > SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", > "/brokers", "kafka-spout"); > spoutconfig.zkServers > spoutconfig.zkPort > > Take a look a KafkaSpout.class and do a debug mode in this part. > > 73 Map stateConf = new HashMap(conf); > 74 List<String> zkServers = _spoutConfig.zkServers; > if (zkServers == null) { > zkServers = (List<String>) > conf.get(Config.STORM_ZOOKEEPER_SERVERS); > } > Integer zkPort = _spoutConfig.zkPort; > if (zkPort == null) { > zkPort = ((Number) > conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); > } > > > > > 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>: > >> So when I run it out of eclipse in local mode I am pointing to the same >> zookeeper as I am when it is deployed. Whatever way it still does not write >> the offset. >> >> Only different I have is my spout is configured like so: >> >> BrokerHosts hosts = new ZkHosts(zookeeperQuorum); >> >> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; >> SpoutConfig spoutConfig = new SpoutConfig( >> hosts, >> topic, // topic to read from >> zookeeperPath , // the root path in Zookeeper for the spout to store the >> consumer offsets >> spoutId); // an id for this consumer for storing the consumer offsets in >> Zookeeper >> >> //Check if we should be consuming messages from the beginning >> spoutConfig.forceFromStart = consumeFromBeginning; >> spoutConfig.maxOffsetBehind = Long.MAX_VALUE; >> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >> spoutConfig.scheme = new SchemeAsMultiScheme(new >> KafkaAnalyticsMessageDecoder()); >> >> >> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com> >> wrote: >> >> I have the same problem when i have Storm in Local cluster mode. but is >> because, Local mode has a zookeeper embedded and every time that restart >> the offset is null and is the reason that you have >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> >> null. >> >> Try to run Local Mode, send some messages to spout, and then see Offset >> in Zookeeper (port 2000). >> >> Or use Zookeeper of kafka, override. >> >> spoutconfig.zkServers >> spoutconfig.zkPort >> >> >> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>: >> >>> Thanks, >>> >>> Yup kafka is creating them on startup when the topics gets its first >>> msg. And from below the storm logs are never really committing the offset >>> to zookeeper. >>> >>> Zookeeper kafka topics details >>> >>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >>> /brokers/topics/warehouse_prices/partitions >>> [2, 1, 0] >>> [zk: 127.0.0.1:2181(CONNECTED) 42] >>> >>> Zookeeper Storm >>> >>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>> [] >>> [zk: 127.0.0.1:2181(CONNECTED) 43] >>> >>> >>> Storm logs >>> >>> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>> - Task [1/1] New partition managers: >>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=0}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=1}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=2}] >>> 44491 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Read partition information from: >>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - No partition information found, using >>> configuration to determine offset >>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Commit offset 0 is more than >>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Starting Kafka >>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0 >>> 44749 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Read partition information from: >>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - No partition information found, using >>> configuration to determine offset >>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Commit offset 0 is more than >>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>> 44779 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Starting Kafka >>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0 >>> 44781 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Read partition information from: >>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - No partition information found, using >>> configuration to determine offset >>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Commit offset 0 is more than >>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.PartitionManager - Starting Kafka >>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0 >>> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>> - Task [1/1] Finished refreshing >>> 47438 [ProcessThread(sid:0 cport:-1):] INFO >>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>> KeeperException when processing sessionid:0x14d7658db3c000c type:create >>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >>> Error:KeeperErrorCode = NoNode for >>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>> No data found in Kafka Partition partition_0 >>> 104828 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>> connections >>> 105005 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>> Task [1/1] assigned >>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=0}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=1}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=2}] >>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>> No data found in Kafka Partition partition_0 >>> 165063 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>> connections >>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>> Task [1/1] assigned >>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=0}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=1}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=2}] >>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>> No data found in Kafka Partition partition_0 >>> 225308 [Thread-15-rawWarehousePriceSpout] INFO >>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>> connections >>> >>> >>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com> >>> wrote: >>> >>> Zookeeper create nothing when startup, you have to create your >>> partitions in kafka broker. >>> >>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>> click_history --replication-factor 1 --partitions 10 >>> >>> >>> >>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>: >>> >>>> All, >>>> >>>> We changed or paths in zookeeper and we are now seeing >>>> >>>> java.lang.RuntimeException: java.lang.RuntimeException: >>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode >>>> = NoNode for /brokers/topics/warehouse_prices/partitions >>>> at storm.kafka.DynamicBrokersReader.getBrokerInfo( >>>> DynamicBrokersReader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4] >>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>> at >>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>>> ~[storm-core-0.9.4.jar:0.9.4] >>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>>> ~[storm-core-0.9.4.jar:0.9.4] >>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>>> >>>> Should this not be discovered by the Spout on startup? >>>> >>> >>> >>> >> >> > >