I've had some more time to look into this and think it must be a bug in Kafka.
I spun up my topology in a clean Docker environment and the offsets went to Kafka fine. Going back to my production Kafka I changed the group.id in my test topology and spun it back up. Oddly, I noticed it was processing and tracking lag fine in the UI but didn't see offsets in either Kafka or Zookeeper. In Kafka when I do a --list I don't see the group.id among all my others but if I ask for it explicitly the Kafka tool crashes with: Error while executing consumer group command Group test-storm-group with protocol type '' is not a valid consumer group Googling around reveled that this might be a bug in Kafka when the non-standard partitioner is in use in the consumer. So I think the offsets really are going to Kafka, I just can't see it using the tools. I think we'll move toward upgrading Kafka to the latest v1.1 in the coming months and see what happens. Thanks everyone. On Sat, May 5, 2018 at 5:56 AM, Stig Rohde Døssing <[email protected]> wrote: > Okay, that's odd. I think it would be helpful to confirm whether it's a > problem in the Storm code or a problem with your Kafka setups. Could you > try to start a KafkaConsumer manually with a new group id and commit an > offset using either commitSync or commitAsync, and verify that the > committed offset ends up in Kafka and not Zookeeper? Maybe also try doing > the same with the consumer group you're using for the topology (after > backing up the previous offset so you can restore). > > Alexandre, your configuration looks good to me. > > https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a code > snippet that will set up a consumer to read and commit messages. If you run > this code (and point to your own Kafka url and topic), you should see the > "consumer-test" group show up in the output from kafka-consumer-groups. > Please let me know whether it shows up for you. It would also be helpful if > you'd try with one of the group ids you're using in the topologies. > > 2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen < > [email protected]>: > >> Hello Stig, >> >> Yes we set consumer group if for our Kafka spout. >> Here's a 1st example of the way we create our Kafka spouts: >> >> KafkaSpoutConfig<String, String> kafkaSpoutConfig = >> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic) >> >> .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId) >> >> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) >> >> .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator()) >> .build(); >> >> KafkaSpout<String, String> NewPodsFromKafkaSpout = new KafkaSpout<String, >> String>(kafkaSpoutConfig); >> >> and another one: >> >> Builder<String, ?> builder = KafkaSpoutConfig >> .builder(supConfig.kafka_broker_hosts_str, myTopic1, >> myTopic2) >> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class) >> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class) >> .setRecordTranslator(new >> RawEventKafkaRecordTranslator(true)); >> builder.setProp(kafkaConsumerProp) >> .setTupleTrackingEnforced(true) >> .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE) >> .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId) >> .setFirstPollOffsetStrategy(strategy); >> IRichSpout spout = builder.build(); >> >> Are we using the right way to set consumer ID ? >> >> Best regards, >> Alexandre Vermeerbergen >> >> >> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>: >> >>> There are a couple other lines that may commit depending on the >>> processing guarantee, https://github.com/apache/stor >>> m/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/ >>> apache/storm/kafka/spout/KafkaSpout.java#L391 for AT_MOST_ONCE and >>> https://github.com/apache/storm/blob/v1.2.1/external/storm-k >>> afka-client/src/main/java/org/apache/storm/kafka/spout/Kafka >>> Spout.java#L294 for NO_GUARANTEE, but basically we always use the >>> KafkaConsumer commit* methods to commit offsets. >>> >>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen < >>> [email protected]>: >>> >>>> Hello Stig, >>>> >>>> I have no checked where Storm Kafka client 1.2.x commits offset, but >>>> I'm pretty sure they are not committed to Kafka Brokers (hence Eric's >>>> mention to Zookeeper seems realistic to me), because I have a Kafka lag >>>> probe (because on Kafa APIs, not on Storm Kafka tooling) which isn't have >>>> to find the consumers corresponding to the Kafka Spouts of my topologies. >>>> >>>> I reported it, but I find this status a pity, we should be able to >>>> monitoring Kafka lag using standard solutions, aren't we? >>>> >>>> And it's puzzling to me that the page you quote ( >>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients >>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. ) says " This >>>> commits offsets to Kafka " when my experience (and Eric's good not to be >>>> alone) tells us that it's not the case. >>>> >>>> Could it be that this was an unintentional regression in Storm Kafka >>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ? >>>> >>>> Best regards, >>>> Alexandre Vermeerbergen >>>> >>>> >>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <[email protected]> >>>> : >>>> >>>>> This would be the regular KafkaSpout, i.e.: new >>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic") >>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProce >>>>> ssingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build(); >>>>> >>>>> The offsets end up in the standard old pre-v0.10 location: /consumers/ >>>>> group.id/offsets/ >>>>> >>>>> >>>>> >>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Are we talking about the regular KafkaSpout or the Trident spout? >>>>>> >>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood, >>>>>> and commits offsets via the commitSync method >>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients >>>>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. Could you >>>>>> elaborate on what happens in your case, e.g. where in Zookeeper are the >>>>>> offsets ending up? >>>>>> >>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson < >>>>>> [email protected]>: >>>>>> >>>>>>> We're working on upgrading our Storm cluster from v1.0.X to v1.2.1. >>>>>>> We're taking advantage of this upgrade by moving to a newer Kafka >>>>>>> v0.10.2 >>>>>>> server from our older v0.8 server and using the built-in new Storm Kafka >>>>>>> spout verses a custom Kafka spout we had before. We've got everything >>>>>>> up >>>>>>> and working now on Storm 1.2.1 except for the fact that Storm insists >>>>>>> that >>>>>>> the Kafka offsets should be written to Zookeeper instead of to Kafka >>>>>>> like >>>>>>> they should be on newer consumers. I've made sure we're using the >>>>>>> latest >>>>>>> 1.2.1 storm-kafka-client and I've tried various versions of the Kafka >>>>>>> client including the latest v1.1.0 but I can't spot any place where this >>>>>>> can be specified. What am I missing? >>>>>>> >>>>>>> -- >>>>>>> Eric >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> *Eric Hokanson* >>>>> Sr Software Engineer | *Return Path* >>>>> w | 303-999-3270 >>>>> m | 970-412-2728 >>>>> [email protected] >>>>> >>>>> >>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> >>>>> [image: Powered by Sigstr] >>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark> >>>>> >>>> >>>> >>> >> > -- *Eric Hokanson* Sr Software Engineer | *Return Path* w | 303-999-3270 m | 970-412-2728 [email protected] <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> [image: Powered by Sigstr] <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
