In such situations, its better to take a jstack and see where things are stuck. @Andrey - Does your spout emit multiple tuples in single nextTuple call?
On Tue, Sep 22, 2015 at 11:35 PM, Jitendra Yadav <[email protected] > wrote: > We had faced similar kind for issue in past, after doing several days of > research we found out that the topology was getting freeze after 5-6 > minutes due to a code issue. We were trying to acknowledge the unanchored > tuple in one of our bolt which actually doesn't make sense but this is kind > of weird situation. > > Thanks > Jitendra > > On Tue, Sep 22, 2015 at 8:34 AM, Andrey Yegorov <[email protected]> > wrote: > >> FWIW, I am not using multilang. >> >> Topology is written in java; I am using storm 0.9.4. >> >> ---------- >> Andrey Yegorov >> >> On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <[email protected]> wrote: >> >>> Hi guys, >>> >>> What we've seen so far is that it's *not* a KafkaSpout-only issue: it's >>> related to how Storm and multilang protocol is implemented in Python. >>> >>> If the bolt's process() execution time is big enough, then the >>> corresponding heartbeat tuples time out. That leaves you in a situation >>> where your bolt is ready to process another tuple but Storm thinks it had >>> some king of problem. If I remember correctly, Storm ""hangs"" waiting for >>> the multilang process error being printed through the stderr output. >>> That'll never happen, and thus the topology seems to be hung. >>> >>> Storm thinks the multilang process had a problem, is waiting for a >>> stderr message but the multilang process is waiting for another tuple... I >>> think this issue covers this problem: >>> https://issues.apache.org/jira/browse/STORM-738 >>> >>> What we've done in our projects is either: >>> 1) Rewrite the Bolts implementation in Java >>> 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to >>> handle heartbeat tuples >>> >>> Hope this helps... >>> >>> On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <[email protected]> >>> wrote: >>> >>>> Could this be the issue you guys are facing? >>>> https://issues.apache.org/jira/browse/STORM-1027 >>>> FYI, above can happen for non-Kafka sources as well. >>>> >>>> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <[email protected]> >>>> wrote: >>>> >>>>> I think we have an issue similar. We are using benstalkd as a message >>>>> source so it's not kafka related in our case. >>>>> >>>>> We have normally 30MB/s traffic between nodes and some logs writing >>>>> down a few durations of topology. Whenever the topology freezes, traffic >>>>> comes down to 200KB/s. and complete latency drops drastically and fail >>>>> counts zeros. We see a fraction of our duration logs and service stops >>>>> working. >>>>> >>>>> Any ideas? >>>>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Alex, >>>>>> >>>>>> Can you share how have you solved/worked around the problem? >>>>>> I hit something similar and I would appreciate any suggestions on how >>>>>> to deal with it. >>>>>> Thank you beforehand. >>>>>> >>>>>> ---------- >>>>>> Andrey Yegorov >>>>>> >>>>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <[email protected]> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried >>>>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster: >>>>>>> >>>>>>> kafkaSpout (3) -----> processBolt (12) >>>>>>> >>>>>>> Some info: >>>>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications >>>>>>> - processBolt iterates throught the message and saves the results >>>>>>> in MongoDB >>>>>>> - processBolt is implemented in Python and has a storm.log("I'm >>>>>>> doing something") just to add a simple debug message in the logs >>>>>>> - The messages can be quite big (~25-40 MB) and are in JSON format >>>>>>> - The kafka topic has a retention of 2 hours >>>>>>> - We use the same ZooKeeper cluster to both Kafka and Storm >>>>>>> >>>>>>> The topology gets frozen after several hours (not days) running. We >>>>>>> don't see any message in the logs... In fact, the periodic message from >>>>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, >>>>>>> the message from the Bolt also dissapears. Logs are copy/pasted further >>>>>>> on. >>>>>>> If we redeploy the topology everything starts to work again until it >>>>>>> becomes frozen again. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Our kafkaSpout config is: >>>>>>> >>>>>>> ZkHosts zkHosts = new >>>>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181"); >>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", >>>>>>> "/topic/ourclientid", "ourclientid"); >>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024; >>>>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024; >>>>>>> >>>>>>> We've also tried setting the following options >>>>>>> >>>>>>> kafkaConfig.forceFromStart = true; >>>>>>> kafkaConfig.startOffsetTime = >>>>>>> kafka.api.OffsetRequest.EarliestTime(); // Also with >>>>>>> kafka.api.OffsetRequest.LatestTime(); >>>>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>>>>>> >>>>>>> Right now the topology is running without acking the messages since >>>>>>> there's a bug in kafkaSpout with failed messages and deleted >>>>>>> offsets in Kafka. >>>>>>> >>>>>>> >>>>>>> >>>>>>> This is what can be seen in the logs in one of the workers: >>>>>>> >>>>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Refreshing partition manager connections >>>>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read >>>>>>> partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>>>> 2=kafka3:9092}} >>>>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] >>>>>>> assigned [Partition{host=kafka2, partition=1}] >>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Deleted partition managers: [] >>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>>>> partition managers: [] >>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Finished refreshing >>>>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>>>> pid:28364, name:processBolt I'm doing something >>>>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Refreshing partition manager connections >>>>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read >>>>>>> partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>>>> 2=kafka3:9092}} >>>>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] >>>>>>> assigned [Partition{host=kafka2:9092, partition=1}] >>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Deleted partition managers: [] >>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>>>> partition managers: [] >>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Finished refreshing >>>>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Refreshing partition manager connections >>>>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read >>>>>>> partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>>>> 2=kafka3:9092}} >>>>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] >>>>>>> assigned [Partition{host=kafka2:9092, partition=1}] >>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Deleted partition managers: [] >>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>>>> partition managers: [] >>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Finished refreshing >>>>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Refreshing partition manager connections >>>>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read >>>>>>> partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>>>> 2=kafka3:9092}} >>>>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] >>>>>>> assigned [Partition{host=kafka2:9092, partition=1}] >>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Deleted partition managers: [] >>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>>>> partition managers: [] >>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Finished refreshing >>>>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Refreshing partition manager connections >>>>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read >>>>>>> partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>>>> 2=kafka3:9092}} >>>>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] >>>>>>> assigned [Partition{host=kafka2:9092, partition=1}] >>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Deleted partition managers: [] >>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>>>> partition managers: [] >>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>>>> Finished refreshing >>>>>>> >>>>>>> >>>>>>> and then it becomes frozen. Nothing is written into the nimbus log. >>>>>>> We've checked the offsets in ZooKeeper and they're not updated: >>>>>>> >>>>>>> >>>>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"} >>>>>>> cZxid = 0x100028958 >>>>>>> ctime = Wed Jul 01 12:22:36 CEST 2015 >>>>>>> mZxid = 0x100518527 >>>>>>> mtime = Thu Jul 23 12:42:41 CEST 2015 >>>>>>> pZxid = 0x100028958 >>>>>>> cversion = 0 >>>>>>> dataVersion = 446913 >>>>>>> aclVersion = 0 >>>>>>> ephemeralOwner = 0x0 >>>>>>> dataLength = 183 >>>>>>> numChildren = 0 >>>>>>> >>>>>>> >>>>>>> >>>>>>> Any ideas of what we could be missing? Should we open a Jira Issue? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> -- >>>>>>> Alex Sobrino Beltrán >>>>>>> Registered Linux User #273657 >>>>>>> >>>>>>> http://v5tech.es >>>>>>> >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Abhishek Agarwal >>>> >>>> >>> >>> >>> -- >>> Alex Sobrino Beltrán >>> Registered Linux User #273657 >>> >>> http://v5tech.es >>> >> >> > -- Regards, Abhishek Agarwal
