In such situations, its better to take a jstack and see where things are
stuck.
@Andrey - Does your spout emit multiple tuples in single nextTuple call?

On Tue, Sep 22, 2015 at 11:35 PM, Jitendra Yadav <[email protected]
> wrote:

> We had faced similar kind for issue in past, after doing several days of
> research we found out that the topology was getting freeze  after 5-6
> minutes due to a code issue. We were trying to acknowledge the unanchored
> tuple in one of our bolt which actually doesn't make sense but this is kind
> of weird situation.
>
> Thanks
> Jitendra
>
> On Tue, Sep 22, 2015 at 8:34 AM, Andrey Yegorov <[email protected]>
> wrote:
>
>> FWIW, I am not using multilang.
>>
>> Topology is written in java; I am using storm 0.9.4.
>>
>> ----------
>> Andrey Yegorov
>>
>> On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <[email protected]> wrote:
>>
>>> Hi guys,
>>>
>>> What we've seen so far is that it's *not* a KafkaSpout-only issue: it's
>>> related to how Storm and multilang protocol is implemented in Python.
>>>
>>> If the bolt's process() execution time is big enough, then the
>>> corresponding heartbeat tuples time out. That leaves you in a situation
>>> where your bolt is ready to process another tuple but Storm thinks it had
>>> some king of problem. If I remember correctly, Storm ""hangs"" waiting for
>>> the multilang process error being printed through the stderr output.
>>> That'll never happen, and thus the topology seems to be hung.
>>>
>>> Storm thinks the multilang process had a problem, is waiting for a
>>> stderr message but the multilang process is waiting for another tuple... I
>>> think this issue covers this problem:
>>> https://issues.apache.org/jira/browse/STORM-738
>>>
>>> What we've done in our projects is either:
>>> 1) Rewrite the Bolts implementation in Java
>>> 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to
>>> handle heartbeat tuples
>>>
>>> Hope this helps...
>>>
>>> On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <[email protected]>
>>> wrote:
>>>
>>>> Could this be the issue you guys are facing?
>>>> https://issues.apache.org/jira/browse/STORM-1027
>>>> FYI, above can happen for non-Kafka sources as well.
>>>>
>>>> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <[email protected]>
>>>> wrote:
>>>>
>>>>> I think we have an issue similar. We are using benstalkd as a message
>>>>> source so it's not kafka related in our case.
>>>>>
>>>>> We have normally 30MB/s traffic between nodes and some logs writing
>>>>> down a few durations of topology. Whenever the topology freezes, traffic
>>>>> comes down to 200KB/s. and complete latency drops drastically and fail
>>>>> counts zeros. We see a fraction of our duration logs and service stops
>>>>> working.
>>>>>
>>>>> Any ideas?
>>>>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Alex,
>>>>>>
>>>>>> Can you share how have you solved/worked around the problem?
>>>>>> I hit something similar and I would appreciate any suggestions on how
>>>>>> to deal with it.
>>>>>> Thank you beforehand.
>>>>>>
>>>>>> ----------
>>>>>> Andrey Yegorov
>>>>>>
>>>>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried
>>>>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>>>>>
>>>>>>> kafkaSpout (3) -----> processBolt (12)
>>>>>>>
>>>>>>> Some info:
>>>>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>>>>>> - processBolt iterates throught the message and saves the results
>>>>>>> in MongoDB
>>>>>>> - processBolt is implemented in Python and has a storm.log("I'm
>>>>>>> doing something") just to add a simple debug message in the logs
>>>>>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>>>>>> - The kafka topic has a retention of 2 hours
>>>>>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>>>>>
>>>>>>> The topology gets frozen after several hours (not days) running. We
>>>>>>> don't see any message in the logs... In fact, the periodic message from
>>>>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine,
>>>>>>> the message from the Bolt also dissapears. Logs are copy/pasted further 
>>>>>>> on.
>>>>>>> If we redeploy the topology everything starts to work again until it
>>>>>>> becomes frozen again.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Our kafkaSpout config is:
>>>>>>>
>>>>>>> ZkHosts zkHosts = new
>>>>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>>>>>> "/topic/ourclientid", "ourclientid");
>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>>>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>>>>>
>>>>>>> We've also tried setting the following options
>>>>>>>
>>>>>>> kafkaConfig.forceFromStart = true;
>>>>>>> kafkaConfig.startOffsetTime =
>>>>>>> kafka.api.OffsetRequest.EarliestTime(); // Also with
>>>>>>> kafka.api.OffsetRequest.LatestTime();
>>>>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>>>>
>>>>>>> Right now the topology is running without acking the messages since
>>>>>>> there's a bug in kafkaSpout with failed messages and deleted
>>>>>>> offsets in Kafka.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This is what can be seen in the logs in one of the workers:
>>>>>>>
>>>>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2, partition=1}]
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>>
>>>>>>>
>>>>>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>>>>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>>>>>
>>>>>>>
>>>>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>>>>>> cZxid = 0x100028958
>>>>>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>>>>>> mZxid = 0x100518527
>>>>>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>>>>>> pZxid = 0x100028958
>>>>>>> cversion = 0
>>>>>>> dataVersion = 446913
>>>>>>> aclVersion = 0
>>>>>>> ephemeralOwner = 0x0
>>>>>>> dataLength = 183
>>>>>>> numChildren = 0
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> --
>>>>>>> Alex Sobrino Beltrán
>>>>>>> Registered Linux User #273657
>>>>>>>
>>>>>>> http://v5tech.es
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Abhishek Agarwal
>>>>
>>>>
>>>
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>
>


-- 
Regards,
Abhishek Agarwal

Reply via email to