VGalaxies commented on code in PR #17780:
URL: https://github.com/apache/iotdb/pull/17780#discussion_r3400587568
##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java:
##########
@@ -150,49 +142,24 @@ public void
testHeartbeatRenewsOwnerLeaseAndAlterOwnerWaitsForLeaseExpiration()
TopicConstant.OWNER_LEASE_DURATION_MS_KEY,
String.valueOf(ownerLeaseDurationMs));
session.createTopic(topicName, properties);
- final Long initialOwnerLeaseExpireTimeMs =
getOwnerLeaseExpireTimeMs(session, topicName);
- Assert.assertNotNull(initialOwnerLeaseExpireTimeMs);
- Assert.assertTrue(initialOwnerLeaseExpireTimeMs >
System.currentTimeMillis());
-
- try (final SubscriptionTreePullConsumer currentOwnerConsumer =
- new SubscriptionTreePullConsumer.Builder()
- .host(host)
- .port(port)
- .consumerId("current_sn")
- .consumerGroupId("topic_owner_lease_group")
- .ownerId("sn1")
- .ownerEpoch(5L)
- .heartbeatIntervalMs(heartbeatIntervalMs)
- .autoCommit(false)
- .buildPullConsumer()) {
- currentOwnerConsumer.open();
- currentOwnerConsumer.subscribe(topicName);
-
- final AtomicReference<Long> renewedOwnerLeaseExpireTimeMs =
- new AtomicReference<>(initialOwnerLeaseExpireTimeMs);
- IoTDBSubscriptionITConstant.AWAIT.untilAsserted(
- () -> {
- final Long ownerLeaseExpireTimeMs =
getOwnerLeaseExpireTimeMs(session, topicName);
- Assert.assertNotNull(ownerLeaseExpireTimeMs);
- Assert.assertTrue(ownerLeaseExpireTimeMs >
initialOwnerLeaseExpireTimeMs);
- renewedOwnerLeaseExpireTimeMs.set(ownerLeaseExpireTimeMs);
- });
-
+ try {
+ // Transferring to a different owner must wait for the old owner's
lease to drain on every
+ // DataNode before the new owner is installed. ConfigNode stops
renewing and waits at least
+ // the lease duration (measured on its own clock), so the call blocks
for >= the lease
+ // duration. This is the admission gate that prevents cross-DataNode
double-active
+ // consuming.
final long alterStartTimeMs = System.currentTimeMillis();
session.alterTopicOwner(topicName, "sn2", 6L);
final long alterElapsedTimeMs = System.currentTimeMillis() -
alterStartTimeMs;
-
- final long ownerLeaseRemainingTimeMs =
- renewedOwnerLeaseExpireTimeMs.get() - alterStartTimeMs;
- if (ownerLeaseRemainingTimeMs > 500L) {
- Assert.assertTrue(alterElapsedTimeMs >= ownerLeaseRemainingTimeMs -
250L);
- }
+ Assert.assertTrue(
+ "alterTopicOwner should block at least the lease drain duration,
but only took "
+ + alterElapsedTimeMs
+ + "ms",
+ alterElapsedTimeMs >= ownerLeaseDurationMs);
Review Comment:
Good catch — addressed in 55b95686c6 (comment clarified). You're right that
the heartbeat interval matters. The ConfigNode does not wait until a DataNode's
absolute lease-expire time (that would couple the two clocks); instead it stops
renewal at `blockSince` and waits `leaseDuration + one heartbeat interval (H)`
on its own clock. The extra H is needed because a renewal may have been in
flight to a DataNode right before renewal was stopped, so that DataNode's lease
could expire as late as `blockSince + H + leaseDuration`. Hence the actual wait
is `L + H`, which is why `alterElapsedTimeMs >= leaseDuration` holds as a
(loose) lower bound. I've updated the test comment to state the `L + H`
relationship explicitly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]