[ 
https://issues.apache.org/jira/browse/KAFKA-9136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-9136:
---------------------------------
    Comment: was deleted

(was: {code:java}
// code placeholder
{code}
public static long lastCommitTimestamp(String groupId, String bootstrapServers) 
\{ int partition = Math.abs(groupId.hashCode() % 50); TopicPartition tp = new 
TopicPartition("__consumer_offsets", partition); Properties props = new 
Properties(); props.put("bootstrap.servers", bootstrapServers); 
props.put("group.id", groupId); props.put("enable.auto.commit", "false"); 
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"); try 
(KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { 
consumer.assign(Arrays.asList(tp)); 
consumer.seekToEnd(Collections.singletonList(tp)); ConsumerRecords<byte[], 
byte[]> records = consumer.poll(Duration.ofSeconds(30)); if (records.count() == 
0) { return -1; } for (ConsumerRecord<byte[], byte[]> record : records) \{ 
BaseKey baseKey = 
GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if(baseKey 
instanceof OffsetKey){ OffsetKey offsetKey = (OffsetKey) baseKey; 
if("one_topic".equals(offsetKey.key().topicPartition().topic())) { 
OffsetAndMetadata offsetAndMetadata = 
GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); 
long commitTimestamp = offsetAndMetadata.commitTimestamp(); //why 
commitTimestamp is current timestamp } } } return 
records.iterator().next().timestamp(); } })

>  get consumer latest commited timestamp
> ---------------------------------------
>
>                 Key: KAFKA-9136
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9136
>             Project: Kafka
>          Issue Type: Wish
>          Components: consumer
>    Affects Versions: 2.3.0
>            Reporter: zhangzhisheng
>            Priority: Major
>
> for example,i have two topics,one_topic、two_topic,each topic have two 
> partitions,consumer group 'c_group' subscribe  this topics ; how to get 
> c_group  latest commited timestamp?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to