I think you should add another Zookeeper, you generally want an odd number since Zookeeper requires a majority (n/2 + 1) of nodes to be available in order for the cluster to function. At 2 nodes your cluster will stop serving requests if either node goes down.
I haven't been able to spot an issue with the spout that could cause the issue you are seeing. This is the code we use to determine which offsets to commit https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L70. To me it doesn't look like it can't return anything smaller than the current commit offset. We initialize the committed offset here https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L170 based on the committed offset returned by Kafka and the first poll offset strategy, which in your case should be UNCOMMITTED_EARLIEST. 2017-08-22 9:03 GMT+02:00 Elyahou Ittah <[email protected]>: > I checked the __consumer_offsets topic and here is an extraction from this > log for the same consumer group, a specific topic (users) and specific > partition (15): > > [storm_kafka_topology,users,15]::[OffsetMetadata[8327,{topic-partition=users-15, > offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557] > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15, > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504] > [storm_kafka_topology,users,15]::[OffsetMetadata[6512,{topic-partition=users-15, > offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612] > [storm_kafka_topology,users,15]::[OffsetMetadata[8172,{topic-partition=users-15, > offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209] > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15, > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337] > [storm_kafka_topology,users,15]::[OffsetMetadata[8333,{topic-partition=users-15, > offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311] > [storm_kafka_topology,users,15]::[OffsetMetadata[8338,{topic-partition=users-15, > offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513] > [storm_kafka_topology,users,15]::[OffsetMetadata[8344,{topic-partition=users-15, > offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > 4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829] > > we can see here that the consumer was at offset 8330 at Sunday, August 20, > 2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka > restart occured at this time) > > On Tue, Aug 22, 2017 at 12:31 AM, Elyahou Ittah <[email protected]> > wrote: > >> The topology is working well and commiting offset for some times, and I >> also restarted it and saw it start from last commited offset, I saw the >> issue only at kafka restart. >> >> I have two zookeeper and they were not restarted. >> >> >> >> On Tue, Aug 22, 2017 at 12:24 AM, Stig Rohde Døssing <[email protected]> >> wrote: >> >>> I think the __consumer_offsets configuration looks fine, I just wanted >>> to be sure there wasn't only one replica. How many Zookeepers do you have, >>> and were they restarted as well? >>> >>> I would suspect that the spout isn't committing properly for some >>> reason. The default behavior is to get the committed offset from Kafka when >>> starting, and if it is present it is used. If it isn't there the spout >>> starts over from the beginning of the partitions. You can check if the >>> spout is committing by enabling debug logging for the classes in >>> storm-kafka-client, to check logs like this one >>> https://github.com/apache/storm/blob/v1.1.0/external/storm-k >>> afka-client/src/main/java/org/apache/storm/kafka/spout/inter >>> nal/OffsetManager.java#L118. >>> >>> 2017-08-21 22:20 GMT+02:00 Elyahou Ittah <[email protected]>: >>> >>>> Hi Stig, >>>> >>>> I don't have this kind of errors normally. It just occured at the >>>> rolling restart of kafka. >>>> >>>> Also the __consumer_offsets configuration is: >>>> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 >>>> Configs:segment.bytes=104857600,cleanup.policy=compact,compr >>>> ession.type=producer >>>> Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: >>>> 0,1 >>>> ... >>>> >>>> The fact that the replication factor is 3 even if there is only two >>>> broker can cause an issue ? >>>> >>>> >>>> On Mon, Aug 21, 2017 at 6:51 PM, Stig Rohde Døssing <[email protected]> >>>> wrote: >>>> >>>>> The spout will reemit some messages if it fails to commit offsets to >>>>> Kafka. Are these CommitFailedExceptions occuring in your logs normally? >>>>> >>>>> Also since the spout stores offsets in Kafka, you may want to check >>>>> the replication factor on that topic by running `./kafka-topics.sh >>>>> --zookeeper localhost:2181 --describe --topic __consumer_offsets` in one >>>>> of >>>>> your Kafka /bin directories. >>>>> >>>>> 2017-08-21 17:17 GMT+02:00 Elyahou Ittah <[email protected]>: >>>>> >>>>>> The config is the default one, I just set the bootstrap server. >>>>>> >>>>>> Kafka version is 0.11 >>>>>> >>>>>> Storm-kafka-client is 1.1.0 >>>>>> >>>>>> On Mon, Aug 21, 2017 at 5:48 PM, Stig Rohde Døssing <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Elyahou, >>>>>>> >>>>>>> Could you post your spout configuration, Kafka version and >>>>>>> storm-kafka-client version? The logs imply that your spouts are not >>>>>>> polling >>>>>>> often enough. >>>>>>> >>>>>>> 2017-08-21 <20%2017%2008%2021> 9:44 GMT+02:00 Elyahou Ittah < >>>>>>> [email protected]>: >>>>>>> >>>>>>>> I noticed that storm kafka spout reconsume all kafka message after >>>>>>>> a rolling restart of kafka cluster. >>>>>>>> >>>>>>>> This issue occured only with kafkaSpout consumer and not for my >>>>>>>> other consumers (ruby based using the kafka consumer API like >>>>>>>> kafkaSpout) >>>>>>>> >>>>>>>> Attached logs of the spout. >>>>>>>> >>>>>>>> Do you know what can cause this kind of behavior ? >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
