Hi,

Opened an issue with this problem:
https://issues.apache.org/jira/browse/STORM-963

Thanks,

On Thu, Jul 23, 2015 at 1:43 PM, Alex Sobrino <[email protected]> wrote:

> Hi,
>
> We've got a pretty simple topology running with Storm 0.9.5 (tried also
> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>
> kafkaSpout (3) -----> processBolt (12)
>
> Some info:
> - kafkaSpout reads from a topic with 3 partitions and 2 replications
> - processBolt iterates throught the message and saves the results in
> MongoDB
> - processBolt is implemented in Python and has a storm.log("I'm doing
> something") just to add a simple debug message in the logs
> - The messages can be quite big (~25-40 MB) and are in JSON format
> - The kafka topic has a retention of 2 hours
> - We use the same ZooKeeper cluster to both Kafka and Storm
>
> The topology gets frozen after several hours (not days) running. We don't
> see any message in the logs... In fact, the periodic message from
> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the
> message from the Bolt also dissapears. Logs are copy/pasted further on. If
> we redeploy the topology everything starts to work again until it becomes
> frozen again.
>
>
>
> Our kafkaSpout config is:
>
> ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
> "/topic/ourclientid", "ourclientid");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.fetchSizeBytes = 50*1024*1024;
> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>
> We've also tried setting the following options
>
> kafkaConfig.forceFromStart = true;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //
> Also with kafka.api.OffsetRequest.LatestTime();
> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>
> Right now the topology is running without acking the messages since
> there's a bug in kafkaSpout with failed messages and deleted offsets in
> Kafka.
>
>
>
> This is what can be seen in the logs in one of the workers:
>
> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2, partition=1}]
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
>
>
> and then it becomes frozen. Nothing is written into the nimbus log. We've
> checked the offsets in ZooKeeper and they're not updated:
>
>
> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
> cZxid = 0x100028958
> ctime = Wed Jul 01 12:22:36 CEST 2015
> mZxid = 0x100518527
> mtime = Thu Jul 23 12:42:41 CEST 2015
> pZxid = 0x100028958
> cversion = 0
> dataVersion = 446913
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 183
> numChildren = 0
>
>
>
> Any ideas of what we could be missing? Should we open a Jira Issue?
>
> Thanks!
>
> --
> Alex Sobrino Beltrán
> Registered Linux User #273657
>
> http://v5tech.es
>



-- 
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es

Reply via email to