I've had some more time to look into this and think it must be a bug in
Kafka.

I spun up my topology in a clean Docker environment and the offsets went to
Kafka fine.  Going back to my production Kafka I changed the group.id in my
test topology and spun it back up.  Oddly, I noticed it was processing and
tracking lag fine in the UI but didn't see offsets in either Kafka or
Zookeeper.  In Kafka when I do a --list I don't see the group.id among all
my others but if I ask for it explicitly the Kafka tool crashes with:

Error while executing consumer group command Group test-storm-group with
protocol type '' is not a valid consumer group

Googling around reveled that this might be a bug in Kafka when the
non-standard partitioner is in use in the consumer.  So I think the offsets
really are going to Kafka, I just can't see it using the tools.  I think
we'll move toward upgrading Kafka to the latest v1.1 in the coming months
and see what happens.  Thanks everyone.




On Sat, May 5, 2018 at 5:56 AM, Stig Rohde Døssing <[email protected]> wrote:

> Okay, that's odd. I think it would be helpful to confirm whether it's a
> problem in the Storm code or a problem with your Kafka setups. Could you
> try to start a KafkaConsumer manually with a new group id and commit an
> offset using either commitSync or commitAsync, and verify that the
> committed offset ends up in Kafka and not Zookeeper? Maybe also try doing
> the same with the consumer group you're using for the topology (after
> backing up the previous offset so you can restore).
>
> Alexandre, your configuration looks good to me.
>
> https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a code
> snippet that will set up a consumer to read and commit messages. If you run
> this code (and point to your own Kafka url and topic), you should see the
> "consumer-test" group show up in the output from kafka-consumer-groups.
> Please let me know whether it shows up for you. It would also be helpful if
> you'd try with one of the group ids you're using in the topologies.
>
> 2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>:
>
>> Hello Stig,
>>
>> Yes we set consumer group if for our Kafka spout.
>> Here's a 1st example of the way we create our Kafka spouts:
>>
>> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
>> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic)
>>
>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId)
>>
>>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
>>
>>     .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator())
>>                                                             .build();
>>
>> KafkaSpout<String, String> NewPodsFromKafkaSpout = new KafkaSpout<String,
>> String>(kafkaSpoutConfig);
>>
>> and another one:
>>
>> Builder<String, ?> builder = KafkaSpoutConfig
>>                     .builder(supConfig.kafka_broker_hosts_str, myTopic1,
>> myTopic2)
>>                     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> StringDeserializer.class)
>>                     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> StringDeserializer.class)
>>                     .setRecordTranslator(new
>> RawEventKafkaRecordTranslator(true));
>> builder.setProp(kafkaConsumerProp)
>>                 .setTupleTrackingEnforced(true)
>>                 .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE)
>>                 .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId)
>>                 .setFirstPollOffsetStrategy(strategy);
>> IRichSpout spout = builder.build();
>>
>> Are we using the right way to set consumer ID ?
>>
>> Best regards,
>> Alexandre Vermeerbergen
>>
>>
>> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>:
>>
>>> There are a couple other lines that may commit depending on the
>>> processing guarantee, https://github.com/apache/stor
>>> m/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/
>>> apache/storm/kafka/spout/KafkaSpout.java#L391 for AT_MOST_ONCE and
>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-k
>>> afka-client/src/main/java/org/apache/storm/kafka/spout/Kafka
>>> Spout.java#L294 for NO_GUARANTEE, but basically we always use the
>>> KafkaConsumer commit* methods to commit offsets.
>>>
>>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen <
>>> [email protected]>:
>>>
>>>> Hello Stig,
>>>>
>>>> I have no checked where Storm Kafka client 1.2.x commits offset, but
>>>> I'm pretty sure they are not committed to Kafka Brokers (hence Eric's
>>>> mention to Zookeeper seems realistic to me), because I have a Kafka lag
>>>> probe (because on Kafa APIs, not on Storm Kafka tooling) which isn't have
>>>> to find the consumers corresponding to the Kafka Spouts of my topologies.
>>>>
>>>> I reported it, but I find this status a pity, we should be able to
>>>> monitoring Kafka lag using standard solutions, aren't we?
>>>>
>>>> And it's puzzling to me that the page you quote (
>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. ) says " This
>>>> commits offsets to Kafka " when my experience (and Eric's good not to be
>>>> alone) tells us that it's not the case.
>>>>
>>>> Could it be that this was an unintentional regression in Storm  Kafka
>>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ?
>>>>
>>>> Best regards,
>>>> Alexandre Vermeerbergen
>>>>
>>>>
>>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <[email protected]>
>>>> :
>>>>
>>>>> This would be the regular KafkaSpout, i.e.: new
>>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic")
>>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProce
>>>>> ssingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build();
>>>>>
>>>>> The offsets end up in the standard old pre-v0.10 location: /consumers/
>>>>> group.id/offsets/
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Are we talking about the regular KafkaSpout or the Trident spout?
>>>>>>
>>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood,
>>>>>> and commits offsets via the commitSync method
>>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>>>>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. Could you
>>>>>> elaborate on what happens in your case, e.g. where in Zookeeper are the
>>>>>> offsets ending up?
>>>>>>
>>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson <
>>>>>> [email protected]>:
>>>>>>
>>>>>>> We're working on upgrading our Storm cluster from v1.0.X to v1.2.1.
>>>>>>> We're taking advantage of this upgrade by moving to a newer Kafka 
>>>>>>> v0.10.2
>>>>>>> server from our older v0.8 server and using the built-in new Storm Kafka
>>>>>>> spout verses a custom Kafka spout we had before.  We've got everything 
>>>>>>> up
>>>>>>> and working now on Storm 1.2.1 except for the fact that Storm insists 
>>>>>>> that
>>>>>>> the Kafka offsets should be written to Zookeeper instead of to Kafka 
>>>>>>> like
>>>>>>> they should be on newer consumers.  I've made sure we're using the 
>>>>>>> latest
>>>>>>> 1.2.1 storm-kafka-client and I've tried various versions of the Kafka
>>>>>>> client including the latest v1.1.0 but I can't spot any place where this
>>>>>>> can be specified.  What am I missing?
>>>>>>>
>>>>>>> --
>>>>>>> Eric
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Eric Hokanson*
>>>>> Sr Software Engineer | *Return Path*
>>>>> w | 303-999-3270
>>>>> m | 970-412-2728
>>>>> [email protected]
>>>>>
>>>>>
>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
>>>>> [image: Powered by Sigstr]
>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
*Eric Hokanson*
Sr Software Engineer | *Return Path*
w | 303-999-3270
m | 970-412-2728
[email protected]


<https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
[image: Powered by Sigstr]
<https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>

Reply via email to