Okay, that's odd. I think it would be helpful to confirm whether it's a
problem in the Storm code or a problem with your Kafka setups. Could you
try to start a KafkaConsumer manually with a new group id and commit an
offset using either commitSync or commitAsync, and verify that the
committed offset ends up in Kafka and not Zookeeper? Maybe also try doing
the same with the consumer group you're using for the topology (after
backing up the previous offset so you can restore).

Alexandre, your configuration looks good to me.

https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a code
snippet that will set up a consumer to read and commit messages. If you run
this code (and point to your own Kafka url and topic), you should see the
"consumer-test" group show up in the output from kafka-consumer-groups.
Please let me know whether it shows up for you. It would also be helpful if
you'd try with one of the group ids you're using in the topologies.

2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen <[email protected]
>:

> Hello Stig,
>
> Yes we set consumer group if for our Kafka spout.
> Here's a 1st example of the way we create our Kafka spouts:
>
> KafkaSpoutConfig<String, String> kafkaSpoutConfig =
> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic)
>
>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId)
>
>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
>
>     .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator())
>                                                             .build();
>
> KafkaSpout<String, String> NewPodsFromKafkaSpout = new KafkaSpout<String,
> String>(kafkaSpoutConfig);
>
> and another one:
>
> Builder<String, ?> builder = KafkaSpoutConfig
>                     .builder(supConfig.kafka_broker_hosts_str, myTopic1,
> myTopic2)
>                     .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class)
>                     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class)
>                     .setRecordTranslator(new RawEventKafkaRecordTranslator(
> true));
> builder.setProp(kafkaConsumerProp)
>                 .setTupleTrackingEnforced(true)
>                 .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE)
>                 .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId)
>                 .setFirstPollOffsetStrategy(strategy);
> IRichSpout spout = builder.build();
>
> Are we using the right way to set consumer ID ?
>
> Best regards,
> Alexandre Vermeerbergen
>
>
> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>:
>
>> There are a couple other lines that may commit depending on the
>> processing guarantee, https://github.com/apache/stor
>> m/blob/v1.2.1/external/storm-kafka-client/src/main/java/
>> org/apache/storm/kafka/spout/KafkaSpout.java#L391 for AT_MOST_ONCE and
>> https://github.com/apache/storm/blob/v1.2.1/external/storm-
>> kafka-client/src/main/java/org/apache/storm/kafka/spout/
>> KafkaSpout.java#L294 for NO_GUARANTEE, but basically we always use the
>> KafkaConsumer commit* methods to commit offsets.
>>
>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen <
>> [email protected]>:
>>
>>> Hello Stig,
>>>
>>> I have no checked where Storm Kafka client 1.2.x commits offset, but I'm
>>> pretty sure they are not committed to Kafka Brokers (hence Eric's mention
>>> to Zookeeper seems realistic to me), because I have a Kafka lag probe
>>> (because on Kafa APIs, not on Storm Kafka tooling) which isn't have to find
>>> the consumers corresponding to the Kafka Spouts of my topologies.
>>>
>>> I reported it, but I find this status a pity, we should be able to
>>> monitoring Kafka lag using standard solutions, aren't we?
>>>
>>> And it's puzzling to me that the page you quote (
>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. ) says " This
>>> commits offsets to Kafka " when my experience (and Eric's good not to be
>>> alone) tells us that it's not the case.
>>>
>>> Could it be that this was an unintentional regression in Storm  Kafka
>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ?
>>>
>>> Best regards,
>>> Alexandre Vermeerbergen
>>>
>>>
>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <[email protected]>:
>>>
>>>> This would be the regular KafkaSpout, i.e.: new
>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic")
>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProce
>>>> ssingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build();
>>>>
>>>> The offsets end up in the standard old pre-v0.10 location: /consumers/
>>>> group.id/offsets/
>>>>
>>>>
>>>>
>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Are we talking about the regular KafkaSpout or the Trident spout?
>>>>>
>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood,
>>>>> and commits offsets via the commitSync method
>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>>>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. Could you
>>>>> elaborate on what happens in your case, e.g. where in Zookeeper are the
>>>>> offsets ending up?
>>>>>
>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson <[email protected]
>>>>> >:
>>>>>
>>>>>> We're working on upgrading our Storm cluster from v1.0.X to v1.2.1.
>>>>>> We're taking advantage of this upgrade by moving to a newer Kafka v0.10.2
>>>>>> server from our older v0.8 server and using the built-in new Storm Kafka
>>>>>> spout verses a custom Kafka spout we had before.  We've got everything up
>>>>>> and working now on Storm 1.2.1 except for the fact that Storm insists 
>>>>>> that
>>>>>> the Kafka offsets should be written to Zookeeper instead of to Kafka like
>>>>>> they should be on newer consumers.  I've made sure we're using the latest
>>>>>> 1.2.1 storm-kafka-client and I've tried various versions of the Kafka
>>>>>> client including the latest v1.1.0 but I can't spot any place where this
>>>>>> can be specified.  What am I missing?
>>>>>>
>>>>>> --
>>>>>> Eric
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *Eric Hokanson*
>>>> Sr Software Engineer | *Return Path*
>>>> w | 303-999-3270
>>>> m | 970-412-2728
>>>> [email protected]
>>>>
>>>>
>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
>>>> [image: Powered by Sigstr]
>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
>>>>
>>>
>>>
>>
>

Reply via email to