We think we found the root cause of our issue. After starting a Topology with a different Consumer Group id, the topology was working fine because it started at the end of the topic queue. We have set the forceFromstart flag to true and after we let that topology consume from the topic long enough, it stopped working at the exact same offset as our Original Consumer Group Id.
Last week, we ran out of disk space, could that have cause the topic to become corrupt at some offset? As for the ByteBufferMessageSet object, its internal buffer contained actual data from our topic, but the iterator was empty. It looks like the internal buffer got filled up until it reached the faulty data. Francois On Tue, Mar 24, 2015 at 12:29 PM, François Méthot <[email protected]> wrote: > > Thanks a lot for your replies. Knowing that we are on the right track. We > downloaded the source code and went step by step in the KafkaSpout Code to > see we could get any hints on what is going wrong. > > The PartitionManager successfully get the jsonOffset. > > Then it tries to fetch message with: > > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > > The returned variable msgs is a ByteBufferMessageSet > > It never enter the following loop: > > for (MessageAndOffset msg : msgs) { > > > > One other thing I was not sure if it is normal is when it calls > KafkaUtils.fetchMessages, the _spoutConfig is set with the > expected id="ConsumerGroupId", but clientId is set to an empty "". > > This is where I am at, any feedback would be appreciated. > > François > P.S.: > Before I stepped into the source I was able to make it work by renaming > the consumer group id. After that, stopping and starting, with and without > the force flag did not cause any issue. Running the same topology with the > original consumer group id is still not working. > > > > On Mon, Mar 23, 2015 at 5:08 PM, Curtis Allen <[email protected]> > wrote: > >> Francois, >> I've used the same pattern you've described and didn't have any problems. >> Your approach is valid. There must be something else going on. >> >> On Mon, Mar 23, 2015 at 2:28 PM Harsha <[email protected]> wrote: >> >>> It looks like your approach is right. Once you turn off forceFromStart >>> and set the offset time to earliestTime only new events from kafka topic >>> will be read. Are you sure that your kafka topic has new data coming in? >>> >>> -- >>> Harsha >>> >>> >>> On March 23, 2015 at 12:48:04 PM, François Méthot ([email protected]) >>> wrote: >>> >>> Hi, >>> >>> We have a storm topology that uses Kafka to read a topic with 6 >>> partitions. ( Kafka 0.8.2, Storm 0.9.3 ) >>> >>> Recently, we had to set the KafkaSpout to read from the beginning, so we >>> temporary configured our KafkaConfig this way: >>> >>> kafkaConfig.forceFromStart=true >>> kafkaConfig.startOffsetTime = OffsetRequest.EarliestTime() >>> >>> It worked well, but afterward, setting those parameters back to false >>> and to LatestTime respectively had no effect. In fact the topology won't >>> read from our topic anymore. >>> >>> When the topology starts, The spout successully logs the offset and >>> consumer group's cursor position for each partition in the worker log. But >>> nothing is read. >>> The only way we can read back from our Topic is to give our SpoutConfig >>> a new Kafka ConsumerGroup Id in the SpoutConfig: >>> ex: new SpoutConfig(zk, topic, zkStormRoot, newConsumerGroupID) >>> >>> Now, the only way I can see to read from the beginning would be to write >>> the position we want to read from in Zookeeper where Consumer Group offset >>> are stored and to restart our topology. I haven't tried it yet. >>> Bottom line, it looks like as you as you use this forceFromStart flag, >>> the consumer group id become unusable. >>> Would anyone know if this is a bug in the KafkaSpout or an issue >>> inherited from bug in Kafka? >>> >>> Thanks >>> Francois >>> >>> >>> >
