FWIW, I am not using multilang. Topology is written in java; I am using storm 0.9.4.
---------- Andrey Yegorov On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <[email protected]> wrote: > Hi guys, > > What we've seen so far is that it's *not* a KafkaSpout-only issue: it's > related to how Storm and multilang protocol is implemented in Python. > > If the bolt's process() execution time is big enough, then the > corresponding heartbeat tuples time out. That leaves you in a situation > where your bolt is ready to process another tuple but Storm thinks it had > some king of problem. If I remember correctly, Storm ""hangs"" waiting for > the multilang process error being printed through the stderr output. > That'll never happen, and thus the topology seems to be hung. > > Storm thinks the multilang process had a problem, is waiting for a stderr > message but the multilang process is waiting for another tuple... I think > this issue covers this problem: > https://issues.apache.org/jira/browse/STORM-738 > > What we've done in our projects is either: > 1) Rewrite the Bolts implementation in Java > 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to > handle heartbeat tuples > > Hope this helps... > > On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <[email protected]> > wrote: > >> Could this be the issue you guys are facing? >> https://issues.apache.org/jira/browse/STORM-1027 >> FYI, above can happen for non-Kafka sources as well. >> >> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <[email protected]> >> wrote: >> >>> I think we have an issue similar. We are using benstalkd as a message >>> source so it's not kafka related in our case. >>> >>> We have normally 30MB/s traffic between nodes and some logs writing down >>> a few durations of topology. Whenever the topology freezes, traffic comes >>> down to 200KB/s. and complete latency drops drastically and fail counts >>> zeros. We see a fraction of our duration logs and service stops working. >>> >>> Any ideas? >>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <[email protected]> >>> wrote: >>> >>>> Hi Alex, >>>> >>>> Can you share how have you solved/worked around the problem? >>>> I hit something similar and I would appreciate any suggestions on how >>>> to deal with it. >>>> Thank you beforehand. >>>> >>>> ---------- >>>> Andrey Yegorov >>>> >>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried >>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster: >>>>> >>>>> kafkaSpout (3) -----> processBolt (12) >>>>> >>>>> Some info: >>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications >>>>> - processBolt iterates throught the message and saves the results in >>>>> MongoDB >>>>> - processBolt is implemented in Python and has a storm.log("I'm doing >>>>> something") just to add a simple debug message in the logs >>>>> - The messages can be quite big (~25-40 MB) and are in JSON format >>>>> - The kafka topic has a retention of 2 hours >>>>> - We use the same ZooKeeper cluster to both Kafka and Storm >>>>> >>>>> The topology gets frozen after several hours (not days) running. We >>>>> don't see any message in the logs... In fact, the periodic message from >>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, >>>>> the message from the Bolt also dissapears. Logs are copy/pasted further >>>>> on. >>>>> If we redeploy the topology everything starts to work again until it >>>>> becomes frozen again. >>>>> >>>>> >>>>> >>>>> Our kafkaSpout config is: >>>>> >>>>> ZkHosts zkHosts = new >>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181"); >>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", >>>>> "/topic/ourclientid", "ourclientid"); >>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024; >>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024; >>>>> >>>>> We've also tried setting the following options >>>>> >>>>> kafkaConfig.forceFromStart = true; >>>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); >>>>> // Also with kafka.api.OffsetRequest.LatestTime(); >>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>>>> >>>>> Right now the topology is running without acking the messages since >>>>> there's a bug in kafkaSpout with failed messages and deleted offsets >>>>> in Kafka. >>>>> >>>>> >>>>> >>>>> This is what can be seen in the logs in one of the workers: >>>>> >>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Refreshing partition manager connections >>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read >>>>> partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>> 2=kafka3:9092}} >>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>>>> [Partition{host=kafka2, partition=1}] >>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Deleted partition managers: [] >>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>> partition managers: [] >>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Finished refreshing >>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog >>>>> pid:28364, name:processBolt I'm doing something >>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Refreshing partition manager connections >>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read >>>>> partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>> 2=kafka3:9092}} >>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>>>> [Partition{host=kafka2:9092, partition=1}] >>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Deleted partition managers: [] >>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>> partition managers: [] >>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Finished refreshing >>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Refreshing partition manager connections >>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read >>>>> partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>> 2=kafka3:9092}} >>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>>>> [Partition{host=kafka2:9092, partition=1}] >>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Deleted partition managers: [] >>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>> partition managers: [] >>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Finished refreshing >>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Refreshing partition manager connections >>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read >>>>> partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>> 2=kafka3:9092}} >>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>>>> [Partition{host=kafka2:9092, partition=1}] >>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Deleted partition managers: [] >>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>> partition managers: [] >>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Finished refreshing >>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Refreshing partition manager connections >>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read >>>>> partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, >>>>> 2=kafka3:9092}} >>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned >>>>> [Partition{host=kafka2:9092, partition=1}] >>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Deleted partition managers: [] >>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New >>>>> partition managers: [] >>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] >>>>> Finished refreshing >>>>> >>>>> >>>>> and then it becomes frozen. Nothing is written into the nimbus log. >>>>> We've checked the offsets in ZooKeeper and they're not updated: >>>>> >>>>> >>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"} >>>>> cZxid = 0x100028958 >>>>> ctime = Wed Jul 01 12:22:36 CEST 2015 >>>>> mZxid = 0x100518527 >>>>> mtime = Thu Jul 23 12:42:41 CEST 2015 >>>>> pZxid = 0x100028958 >>>>> cversion = 0 >>>>> dataVersion = 446913 >>>>> aclVersion = 0 >>>>> ephemeralOwner = 0x0 >>>>> dataLength = 183 >>>>> numChildren = 0 >>>>> >>>>> >>>>> >>>>> Any ideas of what we could be missing? Should we open a Jira Issue? >>>>> >>>>> Thanks! >>>>> >>>>> -- >>>>> Alex Sobrino Beltrán >>>>> Registered Linux User #273657 >>>>> >>>>> http://v5tech.es >>>>> >>>> >>>> >> >> >> -- >> Regards, >> Abhishek Agarwal >> >> > > > -- > Alex Sobrino Beltrán > Registered Linux User #273657 > > http://v5tech.es >
