prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602234146
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +127,73 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ // the callback may only handle errors thrown by consumer.poll in
KafkaBasedLog
+ // e.g. unauthorized to read from topic (non-retriable)
+ // if any are encountered, treat the loading of Checkpoints as failed.
+ Optional<Map<String, Map<TopicPartition, Checkpoint>>>
readCheckpoints(MirrorCheckpointTaskConfig config) {
+ AtomicBoolean successful = new AtomicBoolean(true);
+ Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new
HashMap<>();
+ Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new
Callback<ConsumerRecord<byte[], byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<byte[],
byte[]> cpRecord) {
+ if (error != null && successful.getAndSet(false)) {
+ log.error("Error loading Checkpoint topic", error);
Review Comment:
If we special case not authorized (as above), then the main reasons for
hitting this are (hopefully?) transitory problems - for example: all brokers
being down when the connector is first started. I agree that this should be a
warning with a better explanation of impact.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]