Sorry for the delay but this is the bug I've encountered: https://issues.apache.org/jira/browse/KAFKA-3853 On Fri, May 11, 2018 at 6:27 PM Srishty Agrawal <[email protected]> wrote:
> My team encountered a similar issue yesterday. We are running a topology > in Storm which uses storm-kafka-client and hence should store the consumer > offsets in Kafka by default. The consumer-group set in the topology is not > being listed in the --list command, but shows up when we execute this: > > > *./kafka-consumer-groups.sh --new-consumer -group <consumer-group-name> > --bootstrap-server <kafka-broker-endpoint>:9092 --describe* > > > Also, the result of this command had CONSUMER-ID, HOST and CLIENT-ID > columns empty for Kafka 0.10.2.1. We cross-checked that the offsets were > being written to Kafka topic by consuming the messages from kafka-offsets > topic and parsing them. > > Eric, can you point to the Kafka bug that you came across which addresses > this issue? > > On Mon, May 7, 2018 at 3:44 PM Eric Hokanson <[email protected]> > wrote: > >> I've had some more time to look into this and think it must be a bug in >> Kafka. >> >> I spun up my topology in a clean Docker environment and the offsets went >> to Kafka fine. Going back to my production Kafka I changed the group.id >> in my test topology and spun it back up. Oddly, I noticed it was >> processing and tracking lag fine in the UI but didn't see offsets in either >> Kafka or Zookeeper. In Kafka when I do a --list I don't see the group.id >> among all my others but if I ask for it explicitly the Kafka tool crashes >> with: >> >> Error while executing consumer group command Group test-storm-group with >> protocol type '' is not a valid consumer group >> >> Googling around reveled that this might be a bug in Kafka when the >> non-standard partitioner is in use in the consumer. So I think the offsets >> really are going to Kafka, I just can't see it using the tools. I think >> we'll move toward upgrading Kafka to the latest v1.1 in the coming months >> and see what happens. Thanks everyone. >> >> >> >> >> On Sat, May 5, 2018 at 5:56 AM, Stig Rohde Døssing <[email protected]> >> wrote: >> >>> Okay, that's odd. I think it would be helpful to confirm whether it's a >>> problem in the Storm code or a problem with your Kafka setups. Could you >>> try to start a KafkaConsumer manually with a new group id and commit an >>> offset using either commitSync or commitAsync, and verify that the >>> committed offset ends up in Kafka and not Zookeeper? Maybe also try doing >>> the same with the consumer group you're using for the topology (after >>> backing up the previous offset so you can restore). >>> >>> Alexandre, your configuration looks good to me. >>> >>> https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a >>> code snippet that will set up a consumer to read and commit messages. If >>> you run this code (and point to your own Kafka url and topic), you should >>> see the "consumer-test" group show up in the output from >>> kafka-consumer-groups. Please let me know whether it shows up for you. It >>> would also be helpful if you'd try with one of the group ids you're using >>> in the topologies. >>> >>> 2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen < >>> [email protected]>: >>> >>>> Hello Stig, >>>> >>>> Yes we set consumer group if for our Kafka spout. >>>> Here's a 1st example of the way we create our Kafka spouts: >>>> >>>> KafkaSpoutConfig<String, String> kafkaSpoutConfig = >>>> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic) >>>> >>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId) >>>> >>>> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) >>>> >>>> .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator()) >>>> >>>> .build(); >>>> KafkaSpout<String, String> NewPodsFromKafkaSpout = new >>>> KafkaSpout<String, String>(kafkaSpoutConfig); >>>> >>>> and another one: >>>> >>>> Builder<String, ?> builder = KafkaSpoutConfig >>>> .builder(supConfig.kafka_broker_hosts_str, >>>> myTopic1, myTopic2) >>>> >>>> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>>> StringDeserializer.class) >>>> >>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>> StringDeserializer.class) >>>> .setRecordTranslator(new >>>> RawEventKafkaRecordTranslator(true)); >>>> builder.setProp(kafkaConsumerProp) >>>> .setTupleTrackingEnforced(true) >>>> >>>> .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE) >>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId) >>>> .setFirstPollOffsetStrategy(strategy); >>>> IRichSpout spout = builder.build(); >>>> >>>> Are we using the right way to set consumer ID ? >>>> >>>> Best regards, >>>> Alexandre Vermeerbergen >>>> >>>> >>>> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>: >>>> >>>>> There are a couple other lines that may commit depending on the >>>>> processing guarantee, >>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L391 >>>>> for AT_MOST_ONCE and >>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 >>>>> for NO_GUARANTEE, but basically we always use the KafkaConsumer commit* >>>>> methods to commit offsets. >>>>> >>>>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen < >>>>> [email protected]>: >>>>> >>>>>> Hello Stig, >>>>>> >>>>>> I have no checked where Storm Kafka client 1.2.x commits offset, but >>>>>> I'm pretty sure they are not committed to Kafka Brokers (hence Eric's >>>>>> mention to Zookeeper seems realistic to me), because I have a Kafka lag >>>>>> probe (because on Kafa APIs, not on Storm Kafka tooling) which isn't have >>>>>> to find the consumers corresponding to the Kafka Spouts of my topologies. >>>>>> >>>>>> I reported it, but I find this status a pity, we should be able to >>>>>> monitoring Kafka lag using standard solutions, aren't we? >>>>>> >>>>>> And it's puzzling to me that the page you quote ( >>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-. >>>>>> ) says " This commits offsets to Kafka " when my experience (and Eric's >>>>>> good not to be alone) tells us that it's not the case. >>>>>> >>>>>> Could it be that this was an unintentional regression in Storm Kafka >>>>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ? >>>>>> >>>>>> Best regards, >>>>>> Alexandre Vermeerbergen >>>>>> >>>>>> >>>>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson < >>>>>> [email protected]>: >>>>>> >>>>>>> This would be the regular KafkaSpout, i.e.: new >>>>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic") >>>>>>> >>>>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build(); >>>>>>> >>>>>>> The offsets end up in the standard old pre-v0.10 location: >>>>>>> /consumers/group.id/offsets/ >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Are we talking about the regular KafkaSpout or the Trident spout? >>>>>>>> >>>>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood, >>>>>>>> and commits offsets via the commitSync method >>>>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-. >>>>>>>> Could you elaborate on what happens in your case, e.g. where in >>>>>>>> Zookeeper >>>>>>>> are the offsets ending up? >>>>>>>> >>>>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson < >>>>>>>> [email protected]>: >>>>>>>> >>>>>>>>> We're working on upgrading our Storm cluster from v1.0.X to >>>>>>>>> v1.2.1. We're taking advantage of this upgrade by moving to a newer >>>>>>>>> Kafka >>>>>>>>> v0.10.2 server from our older v0.8 server and using the built-in new >>>>>>>>> Storm >>>>>>>>> Kafka spout verses a custom Kafka spout we had before. We've got >>>>>>>>> everything up and working now on Storm 1.2.1 except for the fact that >>>>>>>>> Storm >>>>>>>>> insists that the Kafka offsets should be written to Zookeeper instead >>>>>>>>> of to >>>>>>>>> Kafka like they should be on newer consumers. I've made sure we're >>>>>>>>> using >>>>>>>>> the latest 1.2.1 storm-kafka-client and I've tried various versions >>>>>>>>> of the >>>>>>>>> Kafka client including the latest v1.1.0 but I can't spot any place >>>>>>>>> where >>>>>>>>> this can be specified. What am I missing? >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Eric >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Eric Hokanson* >>>>>>> Sr Software Engineer | *Return Path* >>>>>>> w | 303-999-3270 >>>>>>> m | 970-412-2728 >>>>>>> [email protected] >>>>>>> >>>>>>> >>>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> >>>>>>> [image: Powered by Sigstr] >>>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> *Eric Hokanson* >> Sr Software Engineer | *Return Path* >> w | 303-999-3270 >> m | 970-412-2728 >> [email protected] >> >> >> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> >> [image: Powered by Sigstr] >> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark> >> > > > -- > Thanks, > Srishty Agrawal > -- *Eric Hokanson* Sr Software Engineer | *Return Path* w | 303-999-3270 m | 970-412-2728 [email protected] <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> [image: Powered by Sigstr] <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
