mimaison commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r435134966
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -16,6 +16,13 @@
*/
package org.apache.kafka.connect.mirror;
+import java.util.HashMap;
Review comment:
can we move these imports with the other `java.util` imports?
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
Checkpoint.unwrapGroup(record.sourcePartition()),
System.currentTimeMillis() - record.timestamp());
}
+
+ private void refreshIdleConsumerGroupOffset() {
+ Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc
= targetAdminClient
+ .describeConsumerGroups(consumerGroups).describedGroups();
+
+ for (String group : consumerGroups) {
+ try {
+ ConsumerGroupDescription consumerGroupDesc =
consumerGroupsDesc.get(group).get();
+ ConsumerGroupState consumerGroupState =
consumerGroupDesc.state();
+ // sync offset to the target cluster only if the state of
current consumer group is:
+ // (1) idle: because the consumer at target is not actively
consuming the mirrored topic
+ // (2) dead: the new consumer that is recently created at
source and never exist at target
+ if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+ idleConsumerGroupsOffset.put(group,
targetAdminClient.listConsumerGroupOffsets(group)
+
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
+ Collectors.toMap(e -> e.getKey(), e ->
e.getValue())));
+ }
+ // new consumer upstream has state "DEAD" and will be
identified during the offset sync-up
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error querying for consumer group {} on cluster
{}.", group, targetClusterAlias, e);
+ }
+ }
+ }
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll =
new HashMap<>();
+
+ // first, sync offsets for the idle consumers at target
+ for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group :
getConvertedUpstreamOffset().entrySet()) {
+ String consumerGroupId = group.getKey();
+ // for each idle consumer at target, read the checkpoints
(converted upstream offset)
+ // from the pre-populated map
+ Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset =
group.getValue();
+
+ Map<TopicPartition, OffsetAndMetadata> offsetToSync = new
HashMap<>();
+ Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset =
idleConsumerGroupsOffset.get(consumerGroupId);
+ if (targetConsumerOffset == null) {
+ // this is a new consumer, just sync the offset to target
+ syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
+ offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
+ continue;
+ }
+
+ for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry :
convertedUpstreamOffset.entrySet()) {
+
+ TopicPartition topicPartition = convertedEntry.getKey();
+ OffsetAndMetadata convertedOffset =
convertedUpstreamOffset.get(topicPartition);
+ if (!targetConsumerOffset.containsKey(topicPartition)) {
+ // if is a new topicPartition from upstream, just sync the
offset to target
+ offsetToSync.put(topicPartition, convertedOffset);
+ continue;
+ }
+
+ // if translated offset from upstream is smaller than the
current consumer offset
+ // in the target, skip updating the offset for that partition
+ long latestDownstreamOffset =
targetConsumerOffset.get(topicPartition).offset();
+ if (latestDownstreamOffset >= convertedOffset.offset()) {
+ log.trace("latestDownstreamOffset {} is larger than
convertedUpstreamOffset {} for "
Review comment:
`larger or equal`?
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,8 +185,30 @@ public void close() {
backup.stop();
}
+
+
@Test
public void testReplication() throws InterruptedException {
+
+ // create consumers before starting the connectors so we don't need to
wait for discovery
+ Consumer<byte[], byte[]> consumer3 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
Review comment:
It's a bit unusual to have `consumer3` and `consumer4` without 1 and 2 =)
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -79,7 +93,15 @@ public void start(Map<String, String> props) {
pollTimeout = config.consumerPollTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+ targetAdminClient = (KafkaAdminClient)
Admin.create(config.targetAdminConfig());
Review comment:
can we change the type definition of these 2 to be `Admin`? Then we
don't need the cast
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
}
+
+ @Test
+ public void testSyncOffset() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
idleConsumerGroupsOffset = new HashMap<>();
+ Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new
HashMap<>();
+
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ // 'c1t1' denotes consumer offsets of all partitions of topic1 for
consumer1
+ Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+ // 't1p0' denotes topic1, partition 0
+ TopicPartition t1p0 = new TopicPartition(topic1, 0);
+ // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for
consumer1
+ Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new
SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));
+
+ c1t1.put(t1p0, new OffsetAndMetadata(100, ""));
+
+ Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
+ TopicPartition t2p0 = new TopicPartition(topic2, 0);
+ Entry<TopicPartition, OffsetAndMetadata> c2t2p0 = new
SimpleEntry<>(t2p0, new OffsetAndMetadata(50, ""));
Review comment:
We can use `new OffsetAndMetadata(50)` if we don't set any metadata.
same below
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException {
backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 *
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
}
+
+ @Test
+ public void testOneWayReplicationWithAutorOffsetSync1() throws
InterruptedException {
+
+ // create consumers before starting the connectors so we don't need to
wait for discovery
+ Consumer<byte[], byte[]> consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "test-topic-1");
+ consumer1.poll(Duration.ofMillis(500));
+ consumer1.commitSync();
+ consumer1.close();
+
+ // enable automated consumer group offset sync
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ // one way replication from primary to backup
+ mm2Props.put("backup->primary.enabled", "false");
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+
+ // sleep 5 seconds to ensure the automated group offset sync is
complete
+ time.sleep(5000);
+
+ // create a consumer at backup cluster with same consumer group Id to
consume 1 topic
+ Consumer<byte[], byte[]> consumer =
backup.kafka().createConsumerAndSubscribeTo(
+ Collections.singletonMap("group.id", "consumer-group-1"),
"primary.test-topic-1");
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+ // the size of consumer record should be zero, because the offsets of
the same consumer group
+ // have been automatically synchronized from primary to backup by the
background job, so no
+ // more records to consume from the replicated topic by the same
consumer group at backup cluster
+ assertTrue("consumer record size is not zero", records.count() == 0);
+
+ // now create a new topic in primary cluster
+ primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+ backup.kafka().createTopic("primary.test-topic-2", 1);
+ // produce some records to the new topic in primary cluster
+ for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+ primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key",
"message-1-" + i);
+ }
+
+ // create a consumer at primary cluster to consume the new topic
+ consumer1 =
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "test-topic-2");
+ consumer1.poll(Duration.ofMillis(500));
+ consumer1.commitSync();
+ consumer1.close();
+
+ // sleep 5 seconds to ensure the automated group offset sync is
complete
+ time.sleep(5000);
+
+ // create a consumer at backup cluster with same consumer group Id to
consume old and new topic
+ consumer =
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "primary.test-topic-1",
"primary.test-topic-2");
+
+ records = consumer.poll(Duration.ofMillis(500));
+ // similar reasoning as above, no more records to consume by the same
consumer group at backup cluster
+ assertTrue("consumer record size is not zero", records.count() == 0);
Review comment:
what about `assertEquals("consumer record size is not zero", 0,
records.count());`? It can also be applied in a few other places
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -29,7 +35,7 @@
@Test
public void testDownstreamTopicRenaming() {
MirrorCheckpointTask mirrorCheckpointTask = new
MirrorCheckpointTask("source1", "target2",
- new DefaultReplicationPolicy(), null);
+ new DefaultReplicationPolicy(), null, new HashMap<>(), new
HashMap<>());
Review comment:
We could use `Collections.emptyMap()` here and in a few places below
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
Checkpoint.unwrapGroup(record.sourcePartition()),
System.currentTimeMillis() - record.timestamp());
}
+
+ private void refreshIdleConsumerGroupOffset() {
+ Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc
= targetAdminClient
+ .describeConsumerGroups(consumerGroups).describedGroups();
Review comment:
It looks like we describe all groups just to get groups in the `EMPTY`
state. Can we use the new `listGroups()` method introduced in
[KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state)
to only get groups in that specific state?
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -234,6 +243,17 @@ Duration adminTimeout() {
return props;
}
+
+ Map<String, Object> targetConsumerConfig() {
+ Map<String, Object> props = new HashMap<>();
+ props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
+ props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+ props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
+ props.put("enable.auto.commit", "false");
Review comment:
Can we use the existing constants for the config names?
##########
File path:
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
}
+
+ @Test
+ public void testSyncOffset() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
idleConsumerGroupsOffset = new HashMap<>();
+ Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new
HashMap<>();
+
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ // 'c1t1' denotes consumer offsets of all partitions of topic1 for
consumer1
+ Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+ // 't1p0' denotes topic1, partition 0
+ TopicPartition t1p0 = new TopicPartition(topic1, 0);
+ // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for
consumer1
+ Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new
SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));
Review comment:
This looks unused, same below for `c2t2p0`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]