Could this be the issue you guys are facing? https://issues.apache.org/jira/browse/STORM-1027 FYI, above can happen for non-Kafka sources as well.
On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <[email protected]> wrote: > I think we have an issue similar. We are using benstalkd as a message > source so it's not kafka related in our case. > > We have normally 30MB/s traffic between nodes and some logs writing down a > few durations of topology. Whenever the topology freezes, traffic comes > down to 200KB/s. and complete latency drops drastically and fail counts > zeros. We see a fraction of our duration logs and service stops working. > > Any ideas? > On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <[email protected]> > wrote: > >> Hi Alex, >> >> Can you share how have you solved/worked around the problem? >> I hit something similar and I would appreciate any suggestions on how to >> deal with it. >> Thank you beforehand. >> >> ---------- >> Andrey Yegorov >> >> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <[email protected]> wrote: >> >>> Hi, >>> >>> We've got a pretty simple topology running with Storm 0.9.5 (tried also >>> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster: >>> >>> kafkaSpout (3) -----> processBolt (12) >>> >>> Some info: >>> - kafkaSpout reads from a topic with 3 partitions and 2 replications >>> - processBolt iterates throught the message and saves the results in >>> MongoDB >>> - processBolt is implemented in Python and has a storm.log("I'm doing >>> something") just to add a simple debug message in the logs >>> - The messages can be quite big (~25-40 MB) and are in JSON format >>> - The kafka topic has a retention of 2 hours >>> - We use the same ZooKeeper cluster to both Kafka and Storm >>> >>> The topology gets frozen after several hours (not days) running. We >>> don't see any message in the logs... In fact, the periodic message from >>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the >>> message from the Bolt also dissapears. Logs are copy/pasted further on. If >>> we redeploy the topology everything starts to work again until it becomes >>> frozen again. >>> >>> >>> >>> Our kafkaSpout config is: >>> >>> ZkHosts zkHosts = new >>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181"); >>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", >>> "/topic/ourclientid", "ourclientid"); >>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> kafkaConfig.fetchSizeBytes = 50*1024*1024; >>> kafkaConfig.bufferSizeBytes = 50*1024*1024; >>> >>> We've also tried setting the following options >>> >>> kafkaConfig.forceFromStart = true; >>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // >>> Also with kafka.api.OffsetRequest.LatestTime(); >>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>> >>> Right now the topology is running without acking the messages since >>> there's a bug in kafkaSpout with failed messages and deleted offsets in >>> Kafka. >>> >>> >>> >>> This is what can be seen in the logs in one of the workers: >>> >>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Refreshing partition manager connections >>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read >>> partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>> 2=kafka3:9092}} >>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>> [Partition{host=kafka2, partition=1}] >>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted >>> partition managers: [] >>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>> partition managers: [] >>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Finished refreshing >>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, >>> name:processBolt I'm doing something >>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Refreshing partition manager connections >>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read >>> partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>> 2=kafka3:9092}} >>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>> [Partition{host=kafka2:9092, partition=1}] >>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted >>> partition managers: [] >>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>> partition managers: [] >>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Finished refreshing >>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Refreshing partition manager connections >>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read >>> partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>> 2=kafka3:9092}} >>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>> [Partition{host=kafka2:9092, partition=1}] >>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted >>> partition managers: [] >>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>> partition managers: [] >>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Finished refreshing >>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Refreshing partition manager connections >>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read >>> partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>> 2=kafka3:9092}} >>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>> [Partition{host=kafka2:9092, partition=1}] >>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted >>> partition managers: [] >>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>> partition managers: [] >>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Finished refreshing >>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Refreshing partition manager connections >>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read >>> partition info from zookeeper: >>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>> 2=kafka3:9092}} >>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>> [Partition{host=kafka2:9092, partition=1}] >>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted >>> partition managers: [] >>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>> partition managers: [] >>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>> Finished refreshing >>> >>> >>> and then it becomes frozen. Nothing is written into the nimbus log. >>> We've checked the offsets in ZooKeeper and they're not updated: >>> >>> >>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"} >>> cZxid = 0x100028958 >>> ctime = Wed Jul 01 12:22:36 CEST 2015 >>> mZxid = 0x100518527 >>> mtime = Thu Jul 23 12:42:41 CEST 2015 >>> pZxid = 0x100028958 >>> cversion = 0 >>> dataVersion = 446913 >>> aclVersion = 0 >>> ephemeralOwner = 0x0 >>> dataLength = 183 >>> numChildren = 0 >>> >>> >>> >>> Any ideas of what we could be missing? Should we open a Jira Issue? >>> >>> Thanks! >>> >>> -- >>> Alex Sobrino Beltrán >>> Registered Linux User #273657 >>> >>> http://v5tech.es >>> >> >> -- Regards, Abhishek Agarwal
