Could this be the issue you guys are facing?
https://issues.apache.org/jira/browse/STORM-1027
FYI, above can happen for non-Kafka sources as well.

On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <[email protected]>
wrote:

> I think we have an issue similar. We are using benstalkd as a message
> source so it's not kafka related in our case.
>
> We have normally 30MB/s traffic between nodes and some logs writing down a
> few durations of topology. Whenever the topology freezes, traffic comes
> down to 200KB/s. and complete latency drops drastically and fail counts
> zeros. We see a fraction of our duration logs and service stops working.
>
> Any ideas?
> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <[email protected]>
> wrote:
>
>> Hi Alex,
>>
>> Can you share how have you solved/worked around the problem?
>> I hit something similar and I would appreciate any suggestions on how to
>> deal with it.
>> Thank you beforehand.
>>
>> ----------
>> Andrey Yegorov
>>
>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> We've got a pretty simple topology running with Storm 0.9.5 (tried also
>>> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>
>>> kafkaSpout (3) -----> processBolt (12)
>>>
>>> Some info:
>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>> - processBolt iterates throught the message and saves the results in
>>> MongoDB
>>> - processBolt is implemented in Python and has a storm.log("I'm doing
>>> something") just to add a simple debug message in the logs
>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>> - The kafka topic has a retention of 2 hours
>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>
>>> The topology gets frozen after several hours (not days) running. We
>>> don't see any message in the logs... In fact, the periodic message from
>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the
>>> message from the Bolt also dissapears. Logs are copy/pasted further on. If
>>> we redeploy the topology everything starts to work again until it becomes
>>> frozen again.
>>>
>>>
>>>
>>> Our kafkaSpout config is:
>>>
>>> ZkHosts zkHosts = new
>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>> "/topic/ourclientid", "ourclientid");
>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>
>>> We've also tried setting the following options
>>>
>>> kafkaConfig.forceFromStart = true;
>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //
>>> Also with kafka.api.OffsetRequest.LatestTime();
>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>
>>> Right now the topology is running without acking the messages since
>>> there's a bug in kafkaSpout with failed messages and deleted offsets in
>>> Kafka.
>>>
>>>
>>>
>>> This is what can be seen in the logs in one of the workers:
>>>
>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2, partition=1}]
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>>
>>>
>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>
>>>
>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>> cZxid = 0x100028958
>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>> mZxid = 0x100518527
>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>> pZxid = 0x100028958
>>> cversion = 0
>>> dataVersion = 446913
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 183
>>> numChildren = 0
>>>
>>>
>>>
>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>
>>> Thanks!
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>


-- 
Regards,
Abhishek Agarwal

Reply via email to