gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
+ private volatile KafkaException lastLoggedErrorReadingCheckpoints
= null;
+
+ void handle(Throwable error, ConsumerRecord<byte[], byte[]>
cpRecord) {
+ // See KafkaBasedLog.poll : only KafkaException can be passed
as error
+ if (error instanceof KafkaException) {
+ // only log once
+ if (lastLoggedErrorReadingCheckpoints == null ||
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+ log.error("Error loading Checkpoint topic", error);
+ lastLoggedErrorReadingCheckpoints = (KafkaException)
error;
+ }
+
+ if (error instanceof RetriableException) {
+ return;
+ } else {
+ throw (KafkaException) error;
+ }
+ } else { // error is null
+ lastLoggedErrorReadingCheckpoints = null;
+ Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
Review Comment:
deserialization can fail due to bad data in the topic
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
+ private volatile KafkaException lastLoggedErrorReadingCheckpoints
= null;
+
+ void handle(Throwable error, ConsumerRecord<byte[], byte[]>
cpRecord) {
+ // See KafkaBasedLog.poll : only KafkaException can be passed
as error
+ if (error instanceof KafkaException) {
+ // only log once
+ if (lastLoggedErrorReadingCheckpoints == null ||
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+ log.error("Error loading Checkpoint topic", error);
+ lastLoggedErrorReadingCheckpoints = (KafkaException)
error;
+ }
+
+ if (error instanceof RetriableException) {
+ return;
+ } else {
+ throw (KafkaException) error;
+ }
+ } else { // error is null
+ lastLoggedErrorReadingCheckpoints = null;
+ Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+ if (consumerGroups.contains(cp.consumerGroupId())) {
+ Map<TopicPartition, Checkpoint> cps =
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored ->
new HashMap<>());
+ cps.put(cp.topicPartition(), cp);
+ }
+ }
+ }
+ }
+
+ CheckpointRecordHandler handler = new CheckpointRecordHandler();
+ TopicAdmin cpAdmin = null;
+ KafkaBasedLog<byte[], byte[]> previousCheckpoints = null;
+
+ try {
+ cpAdmin = new TopicAdmin(
+ config.targetAdminConfig("checkpoint-target-admin"),
+
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));
+
+ previousCheckpoints = KafkaBasedLog.withExistingClients(
+ config.checkpointsTopic(),
+
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
+ null,
+ cpAdmin,
+ (error, cpRecord) -> handler.handle(error, cpRecord),
+ Time.SYSTEM,
+ ignored -> { },
+ topicPartition -> topicPartition.partition() == 0);
+
+ log.info("Starting loading Checkpoint topic : {}",
config.checkpointsTopic());
+ previousCheckpoints.start(true);
+ previousCheckpoints.stop();
+ log.info("Finished loading Checkpoint topic : {}",
config.checkpointsTopic());
+ log.debug("Initial checkpointsPerConsumerGroup : {}",
checkpointsPerConsumerGroup);
+ return true;
+ } catch (KafkaException kexc) {
Review Comment:
This relies on the fact that exceptions thrown from callbacks propagate
synchronously to start(), which is an implementation detail of the
KafkaBasedLog. It would make more sense to me if this class expected exceptions
to be propagated via the callback, and then tested to see if the callback
recorded any exceptions. At the moment the state variable in your handler is
just a log-deduplication mechanism, but it feels like it should alter the
control flow.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
+ private volatile KafkaException lastLoggedErrorReadingCheckpoints
= null;
+
+ void handle(Throwable error, ConsumerRecord<byte[], byte[]>
cpRecord) {
+ // See KafkaBasedLog.poll : only KafkaException can be passed
as error
+ if (error instanceof KafkaException) {
+ // only log once
+ if (lastLoggedErrorReadingCheckpoints == null ||
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+ log.error("Error loading Checkpoint topic", error);
+ lastLoggedErrorReadingCheckpoints = (KafkaException)
error;
+ }
+
+ if (error instanceof RetriableException) {
+ return;
+ } else {
+ throw (KafkaException) error;
+ }
+ } else { // error is null
+ lastLoggedErrorReadingCheckpoints = null;
+ Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+ if (consumerGroups.contains(cp.consumerGroupId())) {
+ Map<TopicPartition, Checkpoint> cps =
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored ->
new HashMap<>());
Review Comment:
This map will need to be thread-safe now.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
+ private volatile KafkaException lastLoggedErrorReadingCheckpoints
= null;
+
+ void handle(Throwable error, ConsumerRecord<byte[], byte[]>
cpRecord) {
+ // See KafkaBasedLog.poll : only KafkaException can be passed
as error
Review Comment:
I think this assumption is detrimental, and this code would be broken if the
assumption is violated in the future. Ee can just broaden the types to
Throwable so that if a non-KafkaException comes through the callback, we handle
it properly (instead of assuming it's null).
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -104,8 +105,13 @@ private KafkaBasedLog<byte[], byte[]>
createBackingStore(MirrorCheckpointConfig
/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs
have been read from backing storage.
+ * @param optimisticLoading
*/
- public void start() {
+ public void start(boolean optimisticLoading) {
+ this.pessimisticLoading = !optimisticLoading;
Review Comment:
Is there any reason that the callers of start are "optimistic" and the
implementation is "pessimistic"? Would it make sense for everyone to share the
interpretation of this boolean?
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore {
super();
}
- @Override
- public void start() {
Review Comment:
Okay yeah this start method is clunky. I would still like to have
OffsetSyncStoreTest use the real start method though. I think a more idiomatic
way of doing this would be to move `backingStore.start()` to a protected
startBackingStore() method that can be overridden with a no-op in
FakeOffsetSyncStore.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
Review Comment:
This is my first time seeing a local class in this codebase, it's a little
bit strange.
I would make this an anonymous class of Callback<ConsumerRecord<byte[],
byte[]>> that uses an AtomicReference<KafkaException> instead to track the
mutable state.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias,
config.targetClusterAlias(), consumerGroups);
}
+ // read the checkpoints topic to initialize the
checkpointsPerConsumerGroup state of this task
+ private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+ class CheckpointRecordHandler {
+ private volatile KafkaException lastLoggedErrorReadingCheckpoints
= null;
+
+ void handle(Throwable error, ConsumerRecord<byte[], byte[]>
cpRecord) {
+ // See KafkaBasedLog.poll : only KafkaException can be passed
as error
+ if (error instanceof KafkaException) {
+ // only log once
+ if (lastLoggedErrorReadingCheckpoints == null ||
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+ log.error("Error loading Checkpoint topic", error);
+ lastLoggedErrorReadingCheckpoints = (KafkaException)
error;
+ }
+
+ if (error instanceof RetriableException) {
+ return;
+ } else {
+ throw (KafkaException) error;
Review Comment:
Typically callbacks don't rethrow the exceptions, they're meant to be
informed about the errors, but leave the control flow for the caller to handle.
I see that this is used to stop the KafkaBasedLog on receiving any fatal error,
which is definitely needed in this case.
Rather than having this control flow caused by exceptions and risk
ungracefully killing the background thread:
https://github.com/apache/kafka/blob/57d30d3450998465177f92516a41218dbe8d4340/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L609
maybe we could split the current `stop()` into an asynchronous stop which is
called from the callback, and a blocking stop which other threads can use to
wait for the background thread to exit and everything to be cleaned up.
I'm not totally satisfied with that design either, so if you've got some
other ideas, please explore it more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]