[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:13 AM:
-----------------------------------------------------------------

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.

 {{           
while (...) {
                ConsumerRecords<String, String> crs1 = 
consumer.poll(Duration.ofMillis(1000L));
                   // print(crs1, consumer); print last record of polled
                    consumer.commitSync(Duration.ofSeconds(10));
                Thread.sleep(1000L);
          }
}}

the first checkpoint is only emitted when the consumer catches up fully at 
10000.
Then other 10000 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

{{
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=10000, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=20000, downstreamOffset=20000, metadata=}
}}

 [^connect.log.2024-04-26-10.zip] 


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.
```
 {{           while (...) {
                ConsumerRecords<String, String> crs1 = 
consumer.poll(Duration.ofMillis(1000L));
                   // print(crs1, consumer); print last record of polled
                    consumer.commitSync(Duration.ofSeconds(10));
                Thread.sleep(1000L);
          }
}}```

the first checkpoint is only emitted when the consumer catches up fully at 
10000.
Then other 10000 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have
```
{{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=10000, downstreamOffset=10000, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=20000, downstreamOffset=20000, metadata=}}}
```
 [^connect.log.2024-04-26-10.zip] 

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16622
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16622
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.7.0, 3.6.2, 3.8.0
>            Reporter: Edoardo Comar
>            Priority: Major
>         Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 10000 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>    --from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to