Sorry for the delay but this is the bug I've encountered:
https://issues.apache.org/jira/browse/KAFKA-3853
On Fri, May 11, 2018 at 6:27 PM Srishty Agrawal <[email protected]>
wrote:

> My team encountered a similar issue yesterday. We are running a topology
> in Storm which uses storm-kafka-client and hence should store the consumer
> offsets in Kafka by default. The consumer-group set in the topology is not
> being listed in the --list command, but shows up when we execute this:
>
>
> *./kafka-consumer-groups.sh --new-consumer -group <consumer-group-name> 
> --bootstrap-server <kafka-broker-endpoint>:9092 --describe*
>
>
> Also, the result of this command had CONSUMER-ID, HOST and CLIENT-ID
> columns empty for Kafka 0.10.2.1. We cross-checked that the offsets were
> being written to Kafka topic by consuming the messages from kafka-offsets
> topic and parsing them.
>
> Eric, can you point to the Kafka bug that you came across which addresses
> this issue?
>
> On Mon, May 7, 2018 at 3:44 PM Eric Hokanson <[email protected]>
> wrote:
>
>> I've had some more time to look into this and think it must be a bug in
>> Kafka.
>>
>> I spun up my topology in a clean Docker environment and the offsets went
>> to Kafka fine.  Going back to my production Kafka I changed the group.id
>> in my test topology and spun it back up.  Oddly, I noticed it was
>> processing and tracking lag fine in the UI but didn't see offsets in either
>> Kafka or Zookeeper.  In Kafka when I do a --list I don't see the group.id
>> among all my others but if I ask for it explicitly the Kafka tool crashes
>> with:
>>
>> Error while executing consumer group command Group test-storm-group with
>> protocol type '' is not a valid consumer group
>>
>> Googling around reveled that this might be a bug in Kafka when the
>> non-standard partitioner is in use in the consumer.  So I think the offsets
>> really are going to Kafka, I just can't see it using the tools.  I think
>> we'll move toward upgrading Kafka to the latest v1.1 in the coming months
>> and see what happens.  Thanks everyone.
>>
>>
>>
>>
>> On Sat, May 5, 2018 at 5:56 AM, Stig Rohde Døssing <[email protected]>
>> wrote:
>>
>>> Okay, that's odd. I think it would be helpful to confirm whether it's a
>>> problem in the Storm code or a problem with your Kafka setups. Could you
>>> try to start a KafkaConsumer manually with a new group id and commit an
>>> offset using either commitSync or commitAsync, and verify that the
>>> committed offset ends up in Kafka and not Zookeeper? Maybe also try doing
>>> the same with the consumer group you're using for the topology (after
>>> backing up the previous offset so you can restore).
>>>
>>> Alexandre, your configuration looks good to me.
>>>
>>> https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a
>>> code snippet that will set up a consumer to read and commit messages. If
>>> you run this code (and point to your own Kafka url and topic), you should
>>> see the "consumer-test" group show up in the output from
>>> kafka-consumer-groups. Please let me know whether it shows up for you. It
>>> would also be helpful if you'd try with one of the group ids you're using
>>> in the topologies.
>>>
>>> 2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen <
>>> [email protected]>:
>>>
>>>> Hello Stig,
>>>>
>>>> Yes we set consumer group if for our Kafka spout.
>>>> Here's a 1st example of the way we create our Kafka spouts:
>>>>
>>>> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
>>>> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic)
>>>>
>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId)
>>>>
>>>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
>>>>
>>>>     .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator())
>>>>
>>>> .build();
>>>> KafkaSpout<String, String> NewPodsFromKafkaSpout = new
>>>> KafkaSpout<String, String>(kafkaSpoutConfig);
>>>>
>>>> and another one:
>>>>
>>>> Builder<String, ?> builder = KafkaSpoutConfig
>>>>                     .builder(supConfig.kafka_broker_hosts_str,
>>>> myTopic1, myTopic2)
>>>>
>>>> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>>> StringDeserializer.class)
>>>>
>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>>> StringDeserializer.class)
>>>>                     .setRecordTranslator(new
>>>> RawEventKafkaRecordTranslator(true));
>>>> builder.setProp(kafkaConsumerProp)
>>>>                 .setTupleTrackingEnforced(true)
>>>>
>>>> .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE)
>>>>                 .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId)
>>>>                 .setFirstPollOffsetStrategy(strategy);
>>>> IRichSpout spout = builder.build();
>>>>
>>>> Are we using the right way to set consumer ID ?
>>>>
>>>> Best regards,
>>>> Alexandre Vermeerbergen
>>>>
>>>>
>>>> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>:
>>>>
>>>>> There are a couple other lines that may commit depending on the
>>>>> processing guarantee,
>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L391
>>>>> for AT_MOST_ONCE and
>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294
>>>>> for NO_GUARANTEE, but basically we always use the KafkaConsumer commit*
>>>>> methods to commit offsets.
>>>>>
>>>>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen <
>>>>> [email protected]>:
>>>>>
>>>>>> Hello Stig,
>>>>>>
>>>>>> I have no checked where Storm Kafka client 1.2.x commits offset, but
>>>>>> I'm pretty sure they are not committed to Kafka Brokers (hence Eric's
>>>>>> mention to Zookeeper seems realistic to me), because I have a Kafka lag
>>>>>> probe (because on Kafa APIs, not on Storm Kafka tooling) which isn't have
>>>>>> to find the consumers corresponding to the Kafka Spouts of my topologies.
>>>>>>
>>>>>> I reported it, but I find this status a pity, we should be able to
>>>>>> monitoring Kafka lag using standard solutions, aren't we?
>>>>>>
>>>>>> And it's puzzling to me that the page you quote (
>>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-.
>>>>>> ) says " This commits offsets to Kafka " when my experience (and Eric's
>>>>>> good not to be alone) tells us that it's not the case.
>>>>>>
>>>>>> Could it be that this was an unintentional regression in Storm  Kafka
>>>>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ?
>>>>>>
>>>>>> Best regards,
>>>>>> Alexandre Vermeerbergen
>>>>>>
>>>>>>
>>>>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <
>>>>>> [email protected]>:
>>>>>>
>>>>>>> This would be the regular KafkaSpout, i.e.: new
>>>>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic")
>>>>>>>
>>>>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build();
>>>>>>>
>>>>>>> The offsets end up in the standard old pre-v0.10 location:
>>>>>>> /consumers/group.id/offsets/
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Are we talking about the regular KafkaSpout or the Trident spout?
>>>>>>>>
>>>>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood,
>>>>>>>> and commits offsets via the commitSync method
>>>>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-.
>>>>>>>> Could you elaborate on what happens in your case, e.g. where in 
>>>>>>>> Zookeeper
>>>>>>>> are the offsets ending up?
>>>>>>>>
>>>>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson <
>>>>>>>> [email protected]>:
>>>>>>>>
>>>>>>>>> We're working on upgrading our Storm cluster from v1.0.X to
>>>>>>>>> v1.2.1.  We're taking advantage of this upgrade by moving to a newer 
>>>>>>>>> Kafka
>>>>>>>>> v0.10.2 server from our older v0.8 server and using the built-in new 
>>>>>>>>> Storm
>>>>>>>>> Kafka spout verses a custom Kafka spout we had before.  We've got
>>>>>>>>> everything up and working now on Storm 1.2.1 except for the fact that 
>>>>>>>>> Storm
>>>>>>>>> insists that the Kafka offsets should be written to Zookeeper instead 
>>>>>>>>> of to
>>>>>>>>> Kafka like they should be on newer consumers.  I've made sure we're 
>>>>>>>>> using
>>>>>>>>> the latest 1.2.1 storm-kafka-client and I've tried various versions 
>>>>>>>>> of the
>>>>>>>>> Kafka client including the latest v1.1.0 but I can't spot any place 
>>>>>>>>> where
>>>>>>>>> this can be specified.  What am I missing?
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eric
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Eric Hokanson*
>>>>>>> Sr Software Engineer | *Return Path*
>>>>>>> w | 303-999-3270
>>>>>>> m | 970-412-2728
>>>>>>> [email protected]
>>>>>>>
>>>>>>>
>>>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
>>>>>>> [image: Powered by Sigstr]
>>>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> *Eric Hokanson*
>> Sr Software Engineer | *Return Path*
>> w | 303-999-3270
>> m | 970-412-2728
>> [email protected]
>>
>>
>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
>> [image: Powered by Sigstr]
>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
>>
>
>
> --
> Thanks,
> Srishty Agrawal
>


-- 
*Eric Hokanson*
Sr Software Engineer | *Return Path*
w | 303-999-3270
m | 970-412-2728
[email protected]


<https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
[image: Powered by Sigstr]
<https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>

Reply via email to