gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602156155
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -103,10 +113,11 @@ public void start(Map<String, String> props) {
targetAdminClient =
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
- checkpointsPerConsumerGroup = new HashMap<>();
+ Optional<Map<String, Map<TopicPartition, Checkpoint>>> checkpoints =
readCheckpoints(config);
Review Comment:
This is a potentially long blocking operation, and those should be avoided
in start() methods because while the task is starting, it can't be stopped, and
if the task can't be stopped within `task.shutdown.graceful.timeout.ms` it is
aggressively cancelled.
Since the main thread needs the result from readCheckpoints, I think it
would be fine to check if it's been loaded and if not, just return an empty
poll().
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +127,73 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ // the callback may only handle errors thrown by consumer.poll in
KafkaBasedLog
+ // e.g. unauthorized to read from topic (non-retriable)
+ // if any are encountered, treat the loading of Checkpoints as failed.
+ Optional<Map<String, Map<TopicPartition, Checkpoint>>>
readCheckpoints(MirrorCheckpointTaskConfig config) {
+ AtomicBoolean successful = new AtomicBoolean(true);
+ Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new
HashMap<>();
+ Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new
Callback<ConsumerRecord<byte[], byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<byte[],
byte[]> cpRecord) {
+ if (error != null && successful.getAndSet(false)) {
+ log.error("Error loading Checkpoint topic", error);
Review Comment:
I'm on the fence whether this should be error or warn. It it something that
the user _must_ address? I'm not so sure.
I do think that this should have an actionable recommendation, or an
explanation that the task is gracefully degrading because of this.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -105,7 +106,13 @@ private KafkaBasedLog<byte[], byte[]>
createBackingStore(MirrorCheckpointConfig
/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs
have been read from backing storage.
*/
- public void start() {
+ public void start(boolean initializationMustReadToEnd) {
+ this.initializationMustReadToEnd = initializationMustReadToEnd;
+ if (initializationMustReadToEnd) {
+ log.warn("OffsetSyncStore initializationMustReadToEnd = {}",
initializationMustReadToEnd);
Review Comment:
debug level, this is not worth warning about.
:+1: for the variable name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]