Hello Stig,
Yes we set consumer group if for our Kafka spout.
Here's a 1st example of the way we create our Kafka spouts:
KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator())
.build();
KafkaSpout<String, String> NewPodsFromKafkaSpout = new KafkaSpout<String,
String>(kafkaSpoutConfig);
and another one:
Builder<String, ?> builder = KafkaSpoutConfig
.builder(supConfig.kafka_broker_hosts_str, myTopic1,
myTopic2)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class)
.setRecordTranslator(new
RawEventKafkaRecordTranslator(true));
builder.setProp(kafkaConsumerProp)
.setTupleTrackingEnforced(true)
.setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId)
.setFirstPollOffsetStrategy(strategy);
IRichSpout spout = builder.build();
Are we using the right way to set consumer ID ?
Best regards,
Alexandre Vermeerbergen
2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>:
> There are a couple other lines that may commit depending on the processing
> guarantee, https://github.com/apache/storm/blob/v1.2.1/external/
> storm-kafka-client/src/main/java/org/apache/storm/kafka/
> spout/KafkaSpout.java#L391 for AT_MOST_ONCE and https://github.com/apache/
> storm/blob/v1.2.1/external/storm-kafka-client/src/main/
> java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 for NO_GUARANTEE,
> but basically we always use the KafkaConsumer commit* methods to commit
> offsets.
>
> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen <
> [email protected]>:
>
>> Hello Stig,
>>
>> I have no checked where Storm Kafka client 1.2.x commits offset, but I'm
>> pretty sure they are not committed to Kafka Brokers (hence Eric's mention
>> to Zookeeper seems realistic to me), because I have a Kafka lag probe
>> (because on Kafa APIs, not on Storm Kafka tooling) which isn't have to find
>> the consumers corresponding to the Kafka Spouts of my topologies.
>>
>> I reported it, but I find this status a pity, we should be able to
>> monitoring Kafka lag using standard solutions, aren't we?
>>
>> And it's puzzling to me that the page you quote (
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. ) says " This
>> commits offsets to Kafka " when my experience (and Eric's good not to be
>> alone) tells us that it's not the case.
>>
>> Could it be that this was an unintentional regression in Storm Kafka
>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ?
>>
>> Best regards,
>> Alexandre Vermeerbergen
>>
>>
>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <[email protected]>:
>>
>>> This would be the regular KafkaSpout, i.e.: new
>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic")
>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProce
>>> ssingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build();
>>>
>>> The offsets end up in the standard old pre-v0.10 location: /consumers/
>>> group.id/offsets/
>>>
>>>
>>>
>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Are we talking about the regular KafkaSpout or the Trident spout?
>>>>
>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood, and
>>>> commits offsets via the commitSync method
>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients
>>>> /consumer/KafkaConsumer.html#commitSync-java.util.Map-. Could you
>>>> elaborate on what happens in your case, e.g. where in Zookeeper are the
>>>> offsets ending up?
>>>>
>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson <[email protected]>
>>>> :
>>>>
>>>>> We're working on upgrading our Storm cluster from v1.0.X to v1.2.1.
>>>>> We're taking advantage of this upgrade by moving to a newer Kafka v0.10.2
>>>>> server from our older v0.8 server and using the built-in new Storm Kafka
>>>>> spout verses a custom Kafka spout we had before. We've got everything up
>>>>> and working now on Storm 1.2.1 except for the fact that Storm insists that
>>>>> the Kafka offsets should be written to Zookeeper instead of to Kafka like
>>>>> they should be on newer consumers. I've made sure we're using the latest
>>>>> 1.2.1 storm-kafka-client and I've tried various versions of the Kafka
>>>>> client including the latest v1.1.0 but I can't spot any place where this
>>>>> can be specified. What am I missing?
>>>>>
>>>>> --
>>>>> Eric
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *Eric Hokanson*
>>> Sr Software Engineer | *Return Path*
>>> w | 303-999-3270
>>> m | 970-412-2728
>>> [email protected]
>>>
>>>
>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f>
>>> [image: Powered by Sigstr]
>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark>
>>>
>>
>>
>