[
https://issues.apache.org/jira/browse/KAFKA-20275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18064191#comment-18064191
]
Werner Daehn commented on KAFKA-20275:
--------------------------------------
Hmm, the first part of your response sounds exactly what I am looking for, the
test case I cannot relate to.
Let me show you code snippets and the error message. It does exactly what you
said. The consumer is created with a group_id, it does an assign and a commit
at the end, yet the commit fails with Broker unknown.
And it makes sense, one process cannot commit something somebody else is
working on. Only if you have subscribed to the partition, you control the
commit. But that is not what you are saying, is it?
{code:java}
conf = {
'bootstrap.servers': self.settings.kafka_server,
'security.protocol': self.settings.kafka_protocol,
'enable.ssl.certificate.verification': 'false',
'sasl.mechanism': 'PLAIN',
'sasl.username': self.settings.kafka_user,
'sasl.password': self.settings.kafka_password,
'group.id': 'DataLakeWriter_APP',
'enable.auto.commit': 'false',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
commit_offset = BEGIN_OFFSET
topicpartition = TopicPartition(topic, partition=partition)
consumer.assign([topicpartition])
while ...
msg = consumer.poll(10.0)
...
commit_offset = msg.offset()+1
consumer.commit(offsets=[TopicPartition(topic, partition=partition,
offset=commit_offset)], asynchronous=False){code}
The result of that was
{code:java}
Commit failed: Broker: Unknown member {code}
> API to set commit offsets without being subscribed
> --------------------------------------------------
>
> Key: KAFKA-20275
> URL: https://issues.apache.org/jira/browse/KAFKA-20275
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Affects Versions: 4.1.1
> Reporter: Werner Daehn
> Priority: Minor
>
> Imagine you want to reset the consumer offset to something earlier. The last
> delta load from offset 100-200 was technically successful but you need to run
> it again.
> So you want to change the offset for topic1/partition0 to 100 again.
> But you cannot, because the only API allowing to change offsets is the
> consumer api and that requires to subscribe first. Without the subscribe,
> assign only, you get a `Commit failed: Broker: Unknown member`.
> But subscribe assigns partitions to you instead of you being in control of
> what partitions you want to change.
>
> Hence the procedure should be:
> * consumer.assign(topicpartitions)
> * consumer.commit(topicpartitions)
> * a rebalance is triggered for all existing consumers to notify them about
> the commit offset change.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)